Skip to content

Commit

Permalink
move physical opt and cost misc to util and split plan interface
Browse files Browse the repository at this point in the history
Signed-off-by: AilinKid <[email protected]>
  • Loading branch information
AilinKid committed Mar 29, 2024
1 parent 8d9e67b commit a8855bb
Show file tree
Hide file tree
Showing 44 changed files with 1,817 additions and 1,376 deletions.
344 changes: 344 additions & 0 deletions go.sum

Large diffs are not rendered by default.

59 changes: 30 additions & 29 deletions pkg/planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/property"
"github.com/pingcap/tidb/pkg/planner/util/coreusage"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/table"
Expand Down Expand Up @@ -858,16 +859,16 @@ func (e *Explain) RenderResult() error {
pp, ok := e.TargetPlan.(PhysicalPlan)
if ok {
if _, err := getPlanCost(pp, property.RootTaskType,
NewDefaultPlanCostOption().WithCostFlag(CostFlagRecalculate|CostFlagUseTrueCardinality|CostFlagTrace)); err != nil {
coreusage.NewDefaultPlanCostOption().WithCostFlag(coreusage.CostFlagRecalculate|coreusage.CostFlagUseTrueCardinality|coreusage.CostFlagTrace)); err != nil {
return err
}
if pp.SCtx().GetSessionVars().CostModelVersion == modelVer2 {
// output cost formula and factor costs through warning under model ver2 and true_card_cost mode for cost calibration.
cost, _ := pp.getPlanCostVer2(property.RootTaskType, NewDefaultPlanCostOption())
if cost.trace != nil {
trace := cost.trace
pp.SCtx().GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackErrorf("cost formula: %v", trace.formula))
data, err := json.Marshal(trace.factorCosts)
cost, _ := pp.GetPlanCostVer2(property.RootTaskType, coreusage.NewDefaultPlanCostOption())
if cost.GetTrace() != nil {
trace := cost.GetTrace()
pp.SCtx().GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackErrorf("cost formula: %v", trace.GetFormula()))
data, err := json.Marshal(trace.GetFactorCosts())
if err != nil {
pp.SCtx().GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackErrorf("marshal factor costs error %v", err))
}
Expand All @@ -877,7 +878,7 @@ func (e *Explain) RenderResult() error {
factors := defaultVer2Factors.tolist()
weights := make(map[string]float64)
for _, factor := range factors {
if factorCost, ok := trace.factorCosts[factor.Name]; ok && factor.Value > 0 {
if factorCost, ok := trace.GetFactorCosts()[factor.Name]; ok && factor.Value > 0 {
weights[factor.Name] = factorCost / factor.Value // cost = [factors] * [weights]
}
}
Expand All @@ -897,7 +898,7 @@ func (e *Explain) RenderResult() error {
if pp, ok := e.TargetPlan.(PhysicalPlan); ok {
// trigger getPlanCost again with CostFlagTrace to record all cost formulas
if _, err := getPlanCost(pp, property.RootTaskType,
NewDefaultPlanCostOption().WithCostFlag(CostFlagRecalculate|CostFlagTrace)); err != nil {
coreusage.NewDefaultPlanCostOption().WithCostFlag(coreusage.CostFlagRecalculate|coreusage.CostFlagTrace)); err != nil {
return err
}
}
Expand Down Expand Up @@ -1142,15 +1143,15 @@ func (e *Explain) getOperatorInfo(p Plan, id string) (estRows, estCost, costForm
estCost = "N/A"
costFormula = "N/A"
if isPhysicalPlan {
estRows = strconv.FormatFloat(pp.getEstRowCountForDisplay(), 'f', 2, 64)
estRows = strconv.FormatFloat(pp.GetEstRowCountForDisplay(), 'f', 2, 64)
if e.SCtx() != nil && e.SCtx().GetSessionVars().CostModelVersion == modelVer2 {
costVer2, _ := pp.getPlanCostVer2(property.RootTaskType, NewDefaultPlanCostOption())
estCost = strconv.FormatFloat(costVer2.cost, 'f', 2, 64)
if costVer2.trace != nil {
costFormula = costVer2.trace.formula
costVer2, _ := pp.GetPlanCostVer2(property.RootTaskType, coreusage.NewDefaultPlanCostOption())
estCost = strconv.FormatFloat(costVer2.GetCost(), 'f', 2, 64)
if costVer2.GetTrace() != nil {
costFormula = costVer2.GetTrace().GetFormula()
}
} else {
planCost, _ := getPlanCost(pp, property.RootTaskType, NewDefaultPlanCostOption())
planCost, _ := getPlanCost(pp, property.RootTaskType, coreusage.NewDefaultPlanCostOption())
estCost = strconv.FormatFloat(planCost, 'f', 2, 64)
}
} else if si := p.StatsInfo(); si != nil {
Expand Down Expand Up @@ -1219,9 +1220,9 @@ func binaryOpTreeFromFlatOps(explainCtx PlanContext, ops FlatPlanTree) *tipb.Exp
return &s[0]
}

func binaryOpFromFlatOp(explainCtx PlanContext, op *FlatOperator, out *tipb.ExplainOperator) {
out.Name = op.Origin.ExplainID().String()
switch op.Label {
func binaryOpFromFlatOp(explainCtx PlanContext, fop *FlatOperator, out *tipb.ExplainOperator) {
out.Name = fop.Origin.ExplainID().String()
switch fop.Label {
case BuildSide:
out.Labels = []tipb.OperatorLabel{tipb.OperatorLabel_buildSide}
case ProbeSide:
Expand All @@ -1231,18 +1232,18 @@ func binaryOpFromFlatOp(explainCtx PlanContext, op *FlatOperator, out *tipb.Expl
case RecursivePart:
out.Labels = []tipb.OperatorLabel{tipb.OperatorLabel_recursivePart}
}
switch op.StoreType {
switch fop.StoreType {
case kv.TiDB:
out.StoreType = tipb.StoreType_tidb
case kv.TiKV:
out.StoreType = tipb.StoreType_tikv
case kv.TiFlash:
out.StoreType = tipb.StoreType_tiflash
}
if op.IsRoot {
if fop.IsRoot {
out.TaskType = tipb.TaskType_root
} else {
switch op.ReqType {
switch fop.ReqType {
case Cop:
out.TaskType = tipb.TaskType_cop
case BatchCop:
Expand All @@ -1252,16 +1253,16 @@ func binaryOpFromFlatOp(explainCtx PlanContext, op *FlatOperator, out *tipb.Expl
}
}

if op.IsPhysicalPlan {
p := op.Origin.(PhysicalPlan)
out.Cost, _ = getPlanCost(p, property.RootTaskType, NewDefaultPlanCostOption())
out.EstRows = p.getEstRowCountForDisplay()
} else if statsInfo := op.Origin.StatsInfo(); statsInfo != nil {
if fop.IsPhysicalPlan {
p := fop.Origin.(PhysicalPlan)
out.Cost, _ = getPlanCost(p, property.RootTaskType, coreusage.NewDefaultPlanCostOption())
out.EstRows = p.GetEstRowCountForDisplay()
} else if statsInfo := fop.Origin.StatsInfo(); statsInfo != nil {
out.EstRows = statsInfo.RowCount
}

// Runtime info
rootStats, copStats, memTracker, diskTracker := getRuntimeInfo(explainCtx, op.Origin, nil)
rootStats, copStats, memTracker, diskTracker := getRuntimeInfo(explainCtx, fop.Origin, nil)
if rootStats != nil {
basic, groups := rootStats.MergeStats()
if basic != nil {
Expand Down Expand Up @@ -1291,14 +1292,14 @@ func binaryOpFromFlatOp(explainCtx PlanContext, op *FlatOperator, out *tipb.Expl
}

// Operator info
if plan, ok := op.Origin.(dataAccesser); ok {
if plan, ok := fop.Origin.(dataAccesser); ok {
out.OperatorInfo = plan.OperatorInfo(false)
} else {
out.OperatorInfo = op.Origin.ExplainInfo()
out.OperatorInfo = fop.Origin.ExplainInfo()
}

// Access object
switch p := op.Origin.(type) {
switch p := fop.Origin.(type) {
case dataAccesser:
ao := p.AccessObject()
if ao != nil {
Expand Down
58 changes: 29 additions & 29 deletions pkg/planner/core/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,26 +61,26 @@ func EncodeFlatPlan(flat *FlatPhysicalPlan) string {
buf.Grow(80 * opCount)
encodeFlatPlanTree(flat.Main, 0, &buf)
for _, cte := range flat.CTEs {
op := cte[0]
fop := cte[0]
cteDef := cte[0].Origin.(*CTEDefinition)
id := cteDef.CTE.IDForStorage
tp := plancodec.TypeCTEDefinition
taskTypeInfo := plancodec.EncodeTaskType(op.IsRoot, op.StoreType)
p := op.Origin
taskTypeInfo := plancodec.EncodeTaskType(fop.IsRoot, fop.StoreType)
p := fop.Origin
actRows, analyzeInfo, memoryInfo, diskInfo := getRuntimeInfoStr(p.SCtx(), p, nil)
var estRows float64
if op.IsPhysicalPlan {
estRows = op.Origin.(PhysicalPlan).getEstRowCountForDisplay()
if fop.IsPhysicalPlan {
estRows = fop.Origin.(PhysicalPlan).GetEstRowCountForDisplay()
} else if statsInfo := p.StatsInfo(); statsInfo != nil {
estRows = statsInfo.RowCount
}
plancodec.EncodePlanNode(
int(op.Depth),
strconv.Itoa(id)+op.Label.String(),
int(fop.Depth),
strconv.Itoa(id)+fop.Label.String(),
tp,
estRows,
taskTypeInfo,
op.Origin.ExplainInfo(),
fop.Origin.ExplainInfo(),
actRows,
analyzeInfo,
memoryInfo,
Expand All @@ -96,40 +96,40 @@ func EncodeFlatPlan(flat *FlatPhysicalPlan) string {

func encodeFlatPlanTree(flatTree FlatPlanTree, offset int, buf *bytes.Buffer) {
for i := 0; i < len(flatTree); {
op := flatTree[i]
taskTypeInfo := plancodec.EncodeTaskType(op.IsRoot, op.StoreType)
p := op.Origin
fop := flatTree[i]
taskTypeInfo := plancodec.EncodeTaskType(fop.IsRoot, fop.StoreType)
p := fop.Origin
actRows, analyzeInfo, memoryInfo, diskInfo := getRuntimeInfoStr(p.SCtx(), p, nil)
var estRows float64
if op.IsPhysicalPlan {
estRows = op.Origin.(PhysicalPlan).getEstRowCountForDisplay()
if fop.IsPhysicalPlan {
estRows = fop.Origin.(PhysicalPlan).GetEstRowCountForDisplay()
} else if statsInfo := p.StatsInfo(); statsInfo != nil {
estRows = statsInfo.RowCount
}
plancodec.EncodePlanNode(
int(op.Depth),
strconv.Itoa(op.Origin.ID())+op.Label.String(),
op.Origin.TP(),
int(fop.Depth),
strconv.Itoa(fop.Origin.ID())+fop.Label.String(),
fop.Origin.TP(),
estRows,
taskTypeInfo,
op.Origin.ExplainInfo(),
fop.Origin.ExplainInfo(),
actRows,
analyzeInfo,
memoryInfo,
diskInfo,
buf,
)

if op.NeedReverseDriverSide {
if fop.NeedReverseDriverSide {
// If NeedReverseDriverSide is true, we don't rely on the order of flatTree.
// Instead, we manually slice the build and probe side children from flatTree and recursively call
// encodeFlatPlanTree to keep build side before probe side.
buildSide := flatTree[op.ChildrenIdx[1]-offset : op.ChildrenEndIdx+1-offset]
probeSide := flatTree[op.ChildrenIdx[0]-offset : op.ChildrenIdx[1]-offset]
encodeFlatPlanTree(buildSide, op.ChildrenIdx[1], buf)
encodeFlatPlanTree(probeSide, op.ChildrenIdx[0], buf)
buildSide := flatTree[fop.ChildrenIdx[1]-offset : fop.ChildrenEndIdx+1-offset]
probeSide := flatTree[fop.ChildrenIdx[0]-offset : fop.ChildrenIdx[1]-offset]
encodeFlatPlanTree(buildSide, fop.ChildrenIdx[1], buf)
encodeFlatPlanTree(probeSide, fop.ChildrenIdx[0], buf)
// Skip the children plan tree of the current operator.
i = op.ChildrenEndIdx + 1 - offset
i = fop.ChildrenEndIdx + 1 - offset
} else {
// Normally, we just go to the next element in the slice.
i++
Expand Down Expand Up @@ -210,7 +210,7 @@ func (pn *planEncoder) encodePlan(p Plan, isRoot bool, store kv.StoreType, depth
actRows, analyzeInfo, memoryInfo, diskInfo := getRuntimeInfoStr(p.SCtx(), p, nil)
rowCount := 0.0
if pp, ok := p.(PhysicalPlan); ok {
rowCount = pp.getEstRowCountForDisplay()
rowCount = pp.GetEstRowCountForDisplay()
} else if statsInfo := p.StatsInfo(); statsInfo != nil {
rowCount = statsInfo.RowCount
}
Expand Down Expand Up @@ -283,12 +283,12 @@ func NormalizeFlatPlan(flat *FlatPhysicalPlan) (normalized string, digest *parse
}()
// assume an operator costs around 30 bytes, preallocate space for them
d.buf.Grow(30 * len(selectPlan))
for _, op := range selectPlan {
taskTypeInfo := plancodec.EncodeTaskTypeForNormalize(op.IsRoot, op.StoreType)
p := op.Origin.(PhysicalPlan)
for _, fop := range selectPlan {
taskTypeInfo := plancodec.EncodeTaskTypeForNormalize(fop.IsRoot, fop.StoreType)
p := fop.Origin.(PhysicalPlan)
plancodec.NormalizePlanNode(
int(op.Depth-uint32(selectPlanOffset)),
op.Origin.TP(),
int(fop.Depth-uint32(selectPlanOffset)),
fop.Origin.TP(),
taskTypeInfo,
p.ExplainNormalizedInfo(),
&d.buf,
Expand Down
30 changes: 15 additions & 15 deletions pkg/planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func (p *LogicalJoin) getHashJoin(prop *property.PhysicalProperty, innerIdx int,
func (p *LogicalJoin) constructIndexJoin(
prop *property.PhysicalProperty,
outerIdx int,
innerTask task,
innerTask Task,
ranges ranger.MutableRanges,
keyOff2IdxOff []int,
path *util.AccessPath,
Expand Down Expand Up @@ -576,7 +576,7 @@ func (p *LogicalJoin) constructIndexJoin(
func (p *LogicalJoin) constructIndexMergeJoin(
prop *property.PhysicalProperty,
outerIdx int,
innerTask task,
innerTask Task,
ranges ranger.MutableRanges,
keyOff2IdxOff []int,
path *util.AccessPath,
Expand Down Expand Up @@ -683,7 +683,7 @@ func (p *LogicalJoin) constructIndexMergeJoin(
func (p *LogicalJoin) constructIndexHashJoin(
prop *property.PhysicalProperty,
outerIdx int,
innerTask task,
innerTask Task,
ranges ranger.MutableRanges,
keyOff2IdxOff []int,
path *util.AccessPath,
Expand Down Expand Up @@ -831,7 +831,7 @@ func (p *LogicalJoin) buildIndexJoinInner2TableScan(
keyOff2IdxOff := make([]int, len(innerJoinKeys))
newOuterJoinKeys := make([]*expression.Column, 0)
var ranges ranger.MutableRanges = ranger.Ranges{}
var innerTask, innerTask2 task
var innerTask, innerTask2 Task
var helper *indexJoinBuildHelper
if ds.tableInfo.IsCommonHandle {
helper, keyOff2IdxOff = p.getIndexJoinBuildHelper(ds, innerJoinKeys, func(path *util.AccessPath) bool { return path.IsCommonHandlePath }, outerJoinKeys)
Expand Down Expand Up @@ -1023,7 +1023,7 @@ func (p *LogicalJoin) constructInnerTableScanTask(
keepOrder bool,
desc bool,
rowCount float64,
) task {
) Task {
ds := wrapper.ds
// If `ds.tableInfo.GetPartitionInfo() != nil`,
// it means the data source is a partition table reader.
Expand Down Expand Up @@ -1088,9 +1088,9 @@ func (p *LogicalJoin) constructInnerTableScanTask(
ts.PlanPartInfo = copTask.physPlanPartInfo
selStats := ts.StatsInfo().Scale(selectivity)
ts.addPushedDownSelection(copTask, selStats)
t := copTask.convertToRootTask(ds.SCtx())
reader := t.p
t.p = p.constructInnerByWrapper(wrapper, reader)
t := copTask.ConvertToRootTask(ds.SCtx())
reader := t.GetPlan()
t.SetPlan(p.constructInnerByWrapper(wrapper, reader))
return t
}

Expand Down Expand Up @@ -1209,7 +1209,7 @@ func (p *LogicalJoin) constructInnerIndexScanTask(
desc bool,
rowCount float64,
maxOneRow bool,
) task {
) Task {
ds := wrapper.ds
// If `ds.tableInfo.GetPartitionInfo() != nil`,
// it means the data source is a partition table reader.
Expand Down Expand Up @@ -1376,9 +1376,9 @@ func (p *LogicalJoin) constructInnerIndexScanTask(
}
finalStats := ds.tableStats.ScaleByExpectCnt(rowCount)
is.addPushedDownSelection(cop, ds, tmpPath, finalStats)
t := cop.convertToRootTask(ds.SCtx())
reader := t.p
t.p = p.constructInnerByWrapper(wrapper, reader)
t := cop.ConvertToRootTask(ds.SCtx())
reader := t.GetPlan()
t.SetPlan(p.constructInnerByWrapper(wrapper, reader))
return t
}

Expand Down Expand Up @@ -2565,7 +2565,7 @@ func (p *LogicalProjection) TryToGetChildProp(prop *property.PhysicalProperty) (
return newProp, true
}

// exhaustPhysicalPlans enumerate all the possible physical plan for expand operator (currently only mpp case is supported)
// exhaustop.PhysicalPlans enumerate all the possible physical plan for expand operator (currently only mpp case is supported)
func (p *LogicalExpand) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool, error) {
// under the mpp task type, if the sort item is not empty, refuse it, cause expanded data doesn't support any sort items.
if !prop.IsSortItemEmpty() {
Expand Down Expand Up @@ -2942,9 +2942,9 @@ func (lw *LogicalWindow) exhaustPhysicalPlans(prop *property.PhysicalProperty) (
return windows, true, nil
}

// exhaustPhysicalPlans is only for implementing interface. DataSource and Dual generate task in `findBestTask` directly.
// exhaustop.PhysicalPlans is only for implementing interface. DataSource and Dual generate task in `findBestTask` directly.
func (*baseLogicalPlan) exhaustPhysicalPlans(*property.PhysicalProperty) ([]PhysicalPlan, bool, error) {
panic("baseLogicalPlan.exhaustPhysicalPlans() should never be called.")
panic("baseLogicalPlan.exhaustop.PhysicalPlans() should never be called.")
}

// canPushToCop checks if it can be pushed to some stores. For TiKV, it only checks datasource.
Expand Down
Loading

0 comments on commit a8855bb

Please sign in to comment.