Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: move logical datasource, table-scan, index-scan, tikv-gather to logicalop pkg #56291

Merged
merged 7 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/planner/cardinality/selectivity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ func TestSelectivity(t *testing.T) {
require.NoErrorf(t, err, "for building plan, expr %s", err, tt.exprs)

sel := p.(base.LogicalPlan).Children()[0].(*logicalop.LogicalSelection)
ds := sel.Children()[0].(*plannercore.DataSource)
ds := sel.Children()[0].(*logicalop.DataSource)

histColl := statsTbl.GenerateHistCollFromColumnInfo(ds.TableInfo, ds.Schema().Columns)

Expand Down Expand Up @@ -515,7 +515,7 @@ func TestDNFCondSelectivity(t *testing.T) {
require.NoErrorf(t, err, "error %v, for building plan, sql %s", err, tt)

sel := p.(base.LogicalPlan).Children()[0].(*logicalop.LogicalSelection)
ds := sel.Children()[0].(*plannercore.DataSource)
ds := sel.Children()[0].(*logicalop.DataSource)

histColl := statsTbl.GenerateHistCollFromColumnInfo(ds.TableInfo, ds.Schema().Columns)

Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/cardinality/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func TestTraceDebugSelectivity(t *testing.T) {
require.NoError(t, err)

sel := p.(base.LogicalPlan).Children()[0].(*logicalop.LogicalSelection)
ds := sel.Children()[0].(*plannercore.DataSource)
ds := sel.Children()[0].(*logicalop.DataSource)

dsSchemaCols = append(dsSchemaCols, ds.Schema().Columns)
selConditions = append(selConditions, sel.Conditions)
Expand Down
10 changes: 5 additions & 5 deletions pkg/planner/cascades/implementation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (*ImplTiKVSingleReadGather) Match(_ *memo.GroupExpr, _ *property.PhysicalPr
// OnImplement implements ImplementationRule OnImplement interface.
func (*ImplTiKVSingleReadGather) OnImplement(expr *memo.GroupExpr, reqProp *property.PhysicalProperty) ([]memo.Implementation, error) {
logicProp := expr.Group.Prop
sg := expr.ExprNode.(*plannercore.TiKVSingleGather)
sg := expr.ExprNode.(*logicalop.TiKVSingleGather)
if sg.IsIndexGather {
reader := plannercore.GetPhysicalIndexReader(sg, logicProp.Schema, logicProp.Stats.ScaleByExpectCnt(reqProp.ExpectedCnt), reqProp)
return []memo.Implementation{impl.NewIndexReaderImpl(reader, sg.Source)}, nil
Expand All @@ -190,14 +190,14 @@ type ImplTableScan struct {

// Match implements ImplementationRule Match interface.
func (*ImplTableScan) Match(expr *memo.GroupExpr, prop *property.PhysicalProperty) (matched bool) {
ts := expr.ExprNode.(*plannercore.LogicalTableScan)
ts := expr.ExprNode.(*logicalop.LogicalTableScan)
return prop.IsSortItemEmpty() || (len(prop.SortItems) == 1 && ts.HandleCols != nil && prop.SortItems[0].Col.EqualColumn(ts.HandleCols.GetCol(0)))
}

// OnImplement implements ImplementationRule OnImplement interface.
func (*ImplTableScan) OnImplement(expr *memo.GroupExpr, reqProp *property.PhysicalProperty) ([]memo.Implementation, error) {
logicProp := expr.Group.Prop
logicalScan := expr.ExprNode.(*plannercore.LogicalTableScan)
logicalScan := expr.ExprNode.(*logicalop.LogicalTableScan)
ts := plannercore.GetPhysicalScan4LogicalTableScan(logicalScan, logicProp.Schema, logicProp.Stats.ScaleByExpectCnt(reqProp.ExpectedCnt))
if !reqProp.IsSortItemEmpty() {
ts.KeepOrder = true
Expand All @@ -213,13 +213,13 @@ type ImplIndexScan struct {

// Match implements ImplementationRule Match interface.
func (*ImplIndexScan) Match(expr *memo.GroupExpr, prop *property.PhysicalProperty) (matched bool) {
is := expr.ExprNode.(*plannercore.LogicalIndexScan)
is := expr.ExprNode.(*logicalop.LogicalIndexScan)
return is.MatchIndexProp(prop)
}

// OnImplement implements ImplementationRule OnImplement interface.
func (*ImplIndexScan) OnImplement(expr *memo.GroupExpr, reqProp *property.PhysicalProperty) ([]memo.Implementation, error) {
logicalScan := expr.ExprNode.(*plannercore.LogicalIndexScan)
logicalScan := expr.ExprNode.(*logicalop.LogicalIndexScan)
is := plannercore.GetPhysicalIndexScan4LogicalIndexScan(logicalScan, expr.Group.Prop.Schema, expr.Group.Prop.Stats.ScaleByExpectCnt(reqProp.ExpectedCnt))
if !reqProp.IsSortItemEmpty() {
is.KeepOrder = true
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/cascades/optimize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestPreparePossibleProperties(t *testing.T) {
require.NoError(t, err)

// collect the target columns: f, a
ds, ok := logic.Children()[0].Children()[0].(*plannercore.DataSource)
ds, ok := logic.Children()[0].Children()[0].(*logicalop.DataSource)
require.True(t, ok)

var columnF, columnA *expression.Column
Expand Down
18 changes: 9 additions & 9 deletions pkg/planner/cascades/transformation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,15 @@ func NewRulePushSelDownTableScan() Transformation {
// the key ranges of the `ts` operator.
func (*PushSelDownTableScan) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) {
sel := old.GetExpr().ExprNode.(*logicalop.LogicalSelection)
ts := old.Children[0].GetExpr().ExprNode.(*plannercore.LogicalTableScan)
ts := old.Children[0].GetExpr().ExprNode.(*logicalop.LogicalTableScan)
if ts.HandleCols == nil {
return nil, false, false, nil
}
accesses, remained := ranger.DetachCondsForColumn(ts.SCtx().GetRangerCtx(), sel.Conditions, ts.HandleCols.GetCol(0))
if accesses == nil {
return nil, false, false, nil
}
newTblScan := plannercore.LogicalTableScan{
newTblScan := logicalop.LogicalTableScan{
Source: ts.Source,
HandleCols: ts.HandleCols,
AccessConds: ts.AccessConds.Shallow(),
Expand Down Expand Up @@ -251,7 +251,7 @@ func NewRulePushSelDownIndexScan() Transformation {
// or just keep the two GroupExprs unchanged.
func (*PushSelDownIndexScan) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) {
sel := old.GetExpr().ExprNode.(*logicalop.LogicalSelection)
is := old.Children[0].GetExpr().ExprNode.(*plannercore.LogicalIndexScan)
is := old.Children[0].GetExpr().ExprNode.(*logicalop.LogicalIndexScan)
if len(is.IdxCols) == 0 {
return nil, false, false, nil
}
Expand Down Expand Up @@ -283,7 +283,7 @@ func (*PushSelDownIndexScan) OnTransform(old *memo.ExprIter) (newExprs []*memo.G
}
}
// TODO: `res` still has some unused fields: EqOrInCount, IsDNFCond.
newIs := plannercore.LogicalIndexScan{
newIs := logicalop.LogicalIndexScan{
Source: is.Source,
IsDoubleRead: is.IsDoubleRead,
EqCondCount: res.EqCondCount,
Expand Down Expand Up @@ -332,7 +332,7 @@ func NewRulePushSelDownTiKVSingleGather() Transformation {
// 2. `remainedSel -> newTg -> pushedSel -> any`
func (*PushSelDownTiKVSingleGather) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) {
sel := old.GetExpr().ExprNode.(*logicalop.LogicalSelection)
sg := old.Children[0].GetExpr().ExprNode.(*plannercore.TiKVSingleGather)
sg := old.Children[0].GetExpr().ExprNode.(*logicalop.TiKVSingleGather)
childGroup := old.Children[0].Children[0].Group
var pushed, remained []expression.Expression
sctx := sg.SCtx()
Expand Down Expand Up @@ -378,7 +378,7 @@ func NewRuleEnumeratePaths() Transformation {

// OnTransform implements Transformation interface.
func (*EnumeratePaths) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) {
ds := old.GetExpr().ExprNode.(*plannercore.DataSource)
ds := old.GetExpr().ExprNode.(*logicalop.DataSource)
gathers := ds.Convert2Gathers()
for _, gather := range gathers {
expr := memo.Convert2GroupExpr(gather)
Expand Down Expand Up @@ -437,7 +437,7 @@ func (r *PushAggDownGather) Match(expr *memo.ExprIter) bool {
func (r *PushAggDownGather) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) {
agg := old.GetExpr().ExprNode.(*logicalop.LogicalAggregation)
aggSchema := old.GetExpr().Group.Prop.Schema
gather := old.Children[0].GetExpr().ExprNode.(*plannercore.TiKVSingleGather)
gather := old.Children[0].GetExpr().ExprNode.(*logicalop.TiKVSingleGather)
childGroup := old.Children[0].GetExpr().Children[0]
// The old Aggregation should stay unchanged for other transformation.
// So we build a new LogicalAggregation for the partialAgg.
Expand Down Expand Up @@ -1416,7 +1416,7 @@ func (r *PushTopNDownTiKVSingleGather) Match(expr *memo.ExprIter) bool {
func (r *PushTopNDownTiKVSingleGather) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) {
topN := old.GetExpr().ExprNode.(*logicalop.LogicalTopN)
topNSchema := old.Children[0].Group.Prop.Schema
gather := old.Children[0].GetExpr().ExprNode.(*plannercore.TiKVSingleGather)
gather := old.Children[0].GetExpr().ExprNode.(*logicalop.TiKVSingleGather)
childGroup := old.Children[0].GetExpr().Children[0]

particalTopN := logicalop.LogicalTopN{
Expand Down Expand Up @@ -1853,7 +1853,7 @@ func (r *PushLimitDownTiKVSingleGather) Match(expr *memo.ExprIter) bool {
func (r *PushLimitDownTiKVSingleGather) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) {
limit := old.GetExpr().ExprNode.(*logicalop.LogicalLimit)
limitSchema := old.Children[0].Group.Prop.Schema
gather := old.Children[0].GetExpr().ExprNode.(*plannercore.TiKVSingleGather)
gather := old.Children[0].GetExpr().ExprNode.(*logicalop.TiKVSingleGather)
childGroup := old.Children[0].GetExpr().Children[0]

particalLimit := logicalop.LogicalLimit{
Expand Down
6 changes: 1 addition & 5 deletions pkg/planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,9 @@ go_library(
"indexmerge_path.go",
"indexmerge_unfinished_path.go",
"initialize.go",
"logical_datasource.go",
"logical_index_scan.go",
"logical_initialize.go",
"logical_plan_builder.go",
"logical_plans.go",
"logical_table_scan.go",
"logical_tikv_single_gather.go",
"memtable_infoschema_extractor.go",
"memtable_predicate_extractor.go",
"mock.go",
Expand Down Expand Up @@ -137,13 +133,13 @@ go_library(
"//pkg/planner/core/resolve",
"//pkg/planner/core/rule",
"//pkg/planner/core/rule/util",
"//pkg/planner/funcdep",
"//pkg/planner/planctx",
"//pkg/planner/property",
"//pkg/planner/util",
"//pkg/planner/util/coreusage",
"//pkg/planner/util/costusage",
"//pkg/planner/util/debugtrace",
"//pkg/planner/util/domainmisc",
"//pkg/planner/util/fixcontrol",
"//pkg/planner/util/optimizetrace",
"//pkg/planner/util/optimizetrace/logicaltrace",
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/casetest/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestGroupNDVs(t *testing.T) {
for i := 1; i < len(v.Children()); i++ {
stack = append(stack, v.Children()[i])
}
case *core.DataSource:
case *logicalop.DataSource:
if len(stack) == 0 {
traversed = true
} else {
Expand Down
16 changes: 8 additions & 8 deletions pkg/planner/core/collect_column_stats_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (c *columnStatsUsageCollector) updateColMapFromExpressions(col *expression.
c.updateColMap(col, expression.ExtractColumnsAndCorColumnsFromExpressions(c.cols[:0], list))
}

func (c *columnStatsUsageCollector) collectPredicateColumnsForDataSource(ds *DataSource) {
func (c *columnStatsUsageCollector) collectPredicateColumnsForDataSource(ds *logicalop.DataSource) {
// Skip all system tables.
if filter.IsSystemSchema(ds.DBName.L) {
return
Expand Down Expand Up @@ -178,7 +178,7 @@ func (c *columnStatsUsageCollector) collectPredicateColumnsForUnionAll(p *logica
}
}

func (c *columnStatsUsageCollector) addHistNeededColumns(ds *DataSource) {
func (c *columnStatsUsageCollector) addHistNeededColumns(ds *logicalop.DataSource) {
c.visitedPhysTblIDs.Insert(int(ds.PhysicalTableID))
if c.collectMode&collectHistNeededColumns == 0 {
return
Expand Down Expand Up @@ -232,12 +232,12 @@ func (c *columnStatsUsageCollector) collectFromPlan(lp base.LogicalPlan) {
}
if c.collectMode&collectPredicateColumns != 0 {
switch x := lp.(type) {
case *DataSource:
case *logicalop.DataSource:
c.collectPredicateColumnsForDataSource(x)
case *LogicalIndexScan:
case *logicalop.LogicalIndexScan:
c.collectPredicateColumnsForDataSource(x.Source)
c.addPredicateColumnsFromExpressions(x.AccessConds)
case *LogicalTableScan:
case *logicalop.LogicalTableScan:
c.collectPredicateColumnsForDataSource(x.Source)
c.addPredicateColumnsFromExpressions(x.AccessConds)
case *logicalop.LogicalProjection:
Expand Down Expand Up @@ -336,11 +336,11 @@ func (c *columnStatsUsageCollector) collectFromPlan(lp base.LogicalPlan) {
// Since c.visitedPhysTblIDs is also collected here and needs to be collected even collectHistNeededColumns is not set,
// so we do the c.collectMode check in addHistNeededColumns() after collecting c.visitedPhysTblIDs.
switch x := lp.(type) {
case *DataSource:
case *logicalop.DataSource:
c.addHistNeededColumns(x)
case *LogicalIndexScan:
case *logicalop.LogicalIndexScan:
c.addHistNeededColumns(x.Source)
case *LogicalTableScan:
case *logicalop.LogicalTableScan:
c.addHistNeededColumns(x.Source)
}
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/planner/core/core_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func init() {
utilfuncp.FindBestTask4LogicalCTETable = findBestTask4LogicalCTETable
utilfuncp.FindBestTask4LogicalMemTable = findBestTask4LogicalMemTable
utilfuncp.FindBestTask4LogicalTableDual = findBestTask4LogicalTableDual
utilfuncp.FindBestTask4LogicalDataSource = findBestTask4LogicalDataSource
utilfuncp.FindBestTask4LogicalShowDDLJobs = findBestTask4LogicalShowDDLJobs
utilfuncp.ExhaustPhysicalPlans4LogicalCTE = exhaustPhysicalPlans4LogicalCTE
utilfuncp.ExhaustPhysicalPlans4LogicalSort = exhaustPhysicalPlans4LogicalSort
Expand All @@ -62,10 +63,16 @@ func init() {
utilfuncp.GetEstimatedProbeCntFromProbeParents = getEstimatedProbeCntFromProbeParents
utilfuncp.AppendCandidate4PhysicalOptimizeOp = appendCandidate4PhysicalOptimizeOp

utilfuncp.PushDownTopNForBaseLogicalPlan = pushDownTopNForBaseLogicalPlan
utilfuncp.AttachPlan2Task = attachPlan2Task
utilfuncp.WindowIsTopN = windowIsTopN
utilfuncp.DoOptimize = doOptimize
utilfuncp.IsSingleScan = isSingleScan
utilfuncp.WindowIsTopN = windowIsTopN
utilfuncp.AttachPlan2Task = attachPlan2Task
utilfuncp.AddPrefix4ShardIndexes = addPrefix4ShardIndexes
utilfuncp.DeriveStats4DataSource = deriveStats4DataSource
utilfuncp.ApplyPredicateSimplification = applyPredicateSimplification
utilfuncp.DeriveStats4LogicalIndexScan = deriveStats4LogicalIndexScan
utilfuncp.DeriveStats4LogicalTableScan = deriveStats4LogicalTableScan
utilfuncp.PushDownTopNForBaseLogicalPlan = pushDownTopNForBaseLogicalPlan

// For mv index init.
cardinality.GetTblInfoForUsedStatsByPhysicalID = getTblInfoForUsedStatsByPhysicalID
Expand Down
12 changes: 6 additions & 6 deletions pkg/planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ func getIndexJoinByOuterIdx(p *logicalop.LogicalJoin, prop *property.PhysicalPro
// hasDitryWrite: whether the inner child contains dirty data.
// zippedChildren: [Projection, Aggregation, Selection]
type indexJoinInnerChildWrapper struct {
ds *DataSource
ds *logicalop.DataSource
hasDitryWrite bool
zippedChildren []base.LogicalPlan
}
Expand All @@ -762,7 +762,7 @@ func extractIndexJoinInnerChildPattern(p *logicalop.LogicalJoin, innerChild base
childLoop:
for curChild := innerChild; curChild != nil; curChild = nextChild(curChild) {
switch child := curChild.(type) {
case *DataSource:
case *logicalop.DataSource:
wrapper.ds = child
break childLoop
case *logicalop.LogicalProjection, *logicalop.LogicalSelection, *logicalop.LogicalAggregation:
Expand Down Expand Up @@ -826,7 +826,7 @@ func buildIndexJoinInner2TableScan(
ranges = indexJoinResult.chosenRanges
} else {
pkMatched := false
pkCol := ds.getPKIsHandleCol()
pkCol := ds.GetPKIsHandleCol()
if pkCol == nil {
return nil
}
Expand Down Expand Up @@ -1181,7 +1181,7 @@ func constructInnerIndexScanTask(
isPartition: ds.PartitionDefIdx != nil,
physicalTableID: ds.PhysicalTableID,
tblColHists: ds.TblColHists,
pkIsHandleCol: ds.getPKIsHandleCol(),
pkIsHandleCol: ds.GetPKIsHandleCol(),
}.Init(ds.SCtx(), ds.QueryBlockOffset())
cop := &CopTask{
indexPlan: is,
Expand Down Expand Up @@ -1338,7 +1338,7 @@ func constructInnerIndexScanTask(
// There are two kinds of agg: stream agg and hash agg. Stream agg depends on some conditions, such as the group by cols
//
// Step2: build other inner plan node to task
func constructIndexJoinInnerSideTaskWithAggCheck(p *logicalop.LogicalJoin, prop *property.PhysicalProperty, dsCopTask *CopTask, ds *DataSource, path *util.AccessPath, wrapper *indexJoinInnerChildWrapper) base.Task {
func constructIndexJoinInnerSideTaskWithAggCheck(p *logicalop.LogicalJoin, prop *property.PhysicalProperty, dsCopTask *CopTask, ds *logicalop.DataSource, path *util.AccessPath, wrapper *indexJoinInnerChildWrapper) base.Task {
var la *logicalop.LogicalAggregation
var canPushAggToCop bool
if len(wrapper.zippedChildren) > 0 {
Expand Down Expand Up @@ -2421,7 +2421,7 @@ func canPushToCopImpl(lp base.LogicalPlan, storeTp kv.StoreType, considerDual bo
ret := true
for _, ch := range p.Children() {
switch c := ch.(type) {
case *DataSource:
case *logicalop.DataSource:
validDs := false
indexMergeIsIntersection := false
for _, path := range c.PossibleAccessPaths {
Expand Down
4 changes: 2 additions & 2 deletions pkg/planner/core/exhaust_physical_plans_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func rewriteSimpleExpr(ctx expression.BuildContext, str string, schema *expressi
}

type indexJoinContext struct {
dataSourceNode *DataSource
dataSourceNode *logicalop.DataSource
dsNames types.NameSlice
path *util.AccessPath
joinNode *logicalop.LogicalJoin
Expand All @@ -66,7 +66,7 @@ func prepareForAnalyzeLookUpFilters() *indexJoinContext {
}()
ctx.GetSessionVars().PlanID.Store(-1)
joinNode := logicalop.LogicalJoin{}.Init(ctx.GetPlanCtx(), 0)
dataSourceNode := DataSource{}.Init(ctx.GetPlanCtx(), 0)
dataSourceNode := logicalop.DataSource{}.Init(ctx.GetPlanCtx(), 0)
dsSchema := expression.NewSchema()
var dsNames types.NameSlice
dsSchema.Append(&expression.Column{
Expand Down
Loading