Skip to content

Commit

Permalink
planner: introduce new cost formula for MPPAggs (pingcap#35436)
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 committed Jul 12, 2022
1 parent 3aef55e commit 1a876c9
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 6 deletions.
31 changes: 27 additions & 4 deletions planner/core/plan_cost.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,12 @@ func (p *PhysicalTableReader) GetPlanCost(taskType property.TaskType, costFlag u
concurrency = float64(p.ctx.GetSessionVars().DistSQLScanConcurrency())
rowSize = getTblStats(p.tablePlan).GetAvgRowSize(p.ctx, p.tablePlan.Schema().Columns, false, false)
seekCost = estimateNetSeekCost(p.tablePlan)
childCost, err := p.tablePlan.GetPlanCost(property.CopSingleReadTaskType, costFlag)
tType := property.MppTaskType
if p.ctx.GetSessionVars().CostModelVersion == modelVer1 {
// regard the underlying tasks as cop-task on modelVer1 for compatibility
tType = property.CopSingleReadTaskType
}
childCost, err := p.tablePlan.GetPlanCost(tType, costFlag)
if err != nil {
return 0, err
}
Expand All @@ -326,7 +331,8 @@ func (p *PhysicalTableReader) GetPlanCost(taskType property.TaskType, costFlag u
// consider concurrency
p.planCost /= concurrency
// consider tidb_enforce_mpp
if isMPP && p.ctx.GetSessionVars().IsMPPEnforced() {
if isMPP && p.ctx.GetSessionVars().IsMPPEnforced() &&
!hasCostFlag(costFlag, CostFlagRecalculate) { // show the real cost in explain-statements
p.planCost /= 1000000000
}
}
Expand Down Expand Up @@ -892,12 +898,19 @@ func (p *PhysicalHashJoin) GetPlanCost(taskType property.TaskType, costFlag uint
}

// GetCost computes cost of stream aggregation considering CPU/memory.
func (p *PhysicalStreamAgg) GetCost(inputRows float64, isRoot bool, costFlag uint64) float64 {
func (p *PhysicalStreamAgg) GetCost(inputRows float64, isRoot, isMPP bool, costFlag uint64) float64 {
aggFuncFactor := p.getAggFuncCostFactor(false)
var cpuCost float64
sessVars := p.ctx.GetSessionVars()
if isRoot {
cpuCost = inputRows * sessVars.GetCPUFactor() * aggFuncFactor
} else if isMPP {
if p.ctx.GetSessionVars().CostModelVersion == modelVer2 {
// use the dedicated CPU factor for TiFlash on modelVer2
cpuCost = inputRows * sessVars.GetTiFlashCPUFactor() * aggFuncFactor
} else {
cpuCost = inputRows * sessVars.GetCopCPUFactor() * aggFuncFactor
}
} else {
cpuCost = inputRows * sessVars.GetCopCPUFactor() * aggFuncFactor
}
Expand All @@ -916,7 +929,7 @@ func (p *PhysicalStreamAgg) GetPlanCost(taskType property.TaskType, costFlag uin
return 0, err
}
p.planCost = childCost
p.planCost += p.GetCost(getCardinality(p.children[0], costFlag), taskType == property.RootTaskType, costFlag)
p.planCost += p.GetCost(getCardinality(p.children[0], costFlag), taskType == property.RootTaskType, taskType == property.MppTaskType, costFlag)
p.planCostInit = true
return p.planCost, nil
}
Expand All @@ -936,6 +949,13 @@ func (p *PhysicalHashAgg) GetCost(inputRows float64, isRoot, isMPP bool, costFla
// Cost of additional goroutines.
cpuCost += (con + 1) * sessVars.GetConcurrencyFactor()
}
} else if isMPP {
if p.ctx.GetSessionVars().CostModelVersion == modelVer2 {
// use the dedicated CPU factor for TiFlash on modelVer2
cpuCost = inputRows * sessVars.GetTiFlashCPUFactor() * aggFuncFactor
} else {
cpuCost = inputRows * sessVars.GetCopCPUFactor() * aggFuncFactor
}
} else {
cpuCost = inputRows * sessVars.GetCopCPUFactor() * aggFuncFactor
}
Expand Down Expand Up @@ -1144,6 +1164,9 @@ func (p *PhysicalExchangeReceiver) GetPlanCost(taskType property.TaskType, costF
}

func getOperatorActRows(operator PhysicalPlan) float64 {
if operator == nil {
return 0
}
runtimeInfo := operator.SCtx().GetSessionVars().StmtCtx.RuntimeStatsColl
id := operator.ID()
actRows := 0.0
Expand Down
4 changes: 2 additions & 2 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1666,7 +1666,7 @@ func (p *PhysicalStreamAgg) attach2Task(tasks ...task) task {
partialAgg.SetChildren(cop.indexPlan)
cop.indexPlan = partialAgg
}
cop.addCost(partialAgg.(*PhysicalStreamAgg).GetCost(inputRows, false, 0))
cop.addCost(partialAgg.(*PhysicalStreamAgg).GetCost(inputRows, false, false, 0))
partialAgg.SetCost(cop.cost())
}
t = cop.convertToRootTask(p.ctx)
Expand All @@ -1679,7 +1679,7 @@ func (p *PhysicalStreamAgg) attach2Task(tasks ...task) task {
} else {
attachPlan2Task(p, t)
}
t.addCost(final.GetCost(inputRows, true, 0))
t.addCost(final.GetCost(inputRows, true, false, 0))
t.plan().SetCost(t.cost())
return t
}
Expand Down

0 comments on commit 1a876c9

Please sign in to comment.