Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: Enhance task field for explain/explain analyze #33333

Merged
merged 12 commits into from
Mar 29, 2022
6 changes: 1 addition & 5 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3143,7 +3143,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
plans: v.TablePlans,
tablePlan: v.GetTablePlan(),
storeType: v.StoreType,
batchCop: v.BatchCop,
batchCop: v.ReadReqType == plannercore.BatchCop,
}
e.buildVirtualColumnInfo()
if containsLimit(dagReq.Executors) {
Expand Down Expand Up @@ -3208,17 +3208,13 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E
}
})
if useMPPExecution(b.ctx, v) {
plannercore.SetMppOrBatchCopForTableScan(v.GetTablePlan())
return b.buildMPPGather(v)
}
ts, err := v.GetTableScan()
if err != nil {
b.err = err
return nil
}
if v.BatchCop {
ts.IsMPPOrBatchCop = true
}
ret, err := buildNoRangeTableReader(b, v)
if err != nil {
b.err = err
Expand Down
9 changes: 2 additions & 7 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1387,19 +1387,14 @@ func (e *Explain) explainPlanInRowFormat(p Plan, taskType, driverSide, indent st

switch x := p.(type) {
case *PhysicalTableReader:
var storeType string
switch x.StoreType {
case kv.TiKV, kv.TiFlash, kv.TiDB:
// expected do nothing
default:
return errors.Errorf("the store type %v is unknown", x.StoreType)
}
storeType = x.StoreType.Name()
taskName := "cop"
if x.BatchCop {
taskName = "batchCop"
}
err = e.explainPlanInRowFormat(x.tablePlan, taskName+"["+storeType+"]", "", childIndent, true)
taskName := x.ReadReqType.Name() + "[" + x.StoreType.Name() + "]"
err = e.explainPlanInRowFormat(x.tablePlan, taskName, "", childIndent, true)
case *PhysicalIndexReader:
err = e.explainPlanInRowFormat(x.indexPlan, "cop[tikv]", "", childIndent, true)
case *PhysicalIndexLookUpReader:
Expand Down
67 changes: 41 additions & 26 deletions planner/core/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,39 +414,54 @@ func (p PhysicalIndexMergeReader) Init(ctx sessionctx.Context, offset int) *Phys
return &p
}

// Init initializes PhysicalTableReader.
func (p PhysicalTableReader) Init(ctx sessionctx.Context, offset int) *PhysicalTableReader {
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeTableReader, &p, offset)
if p.tablePlan != nil {
p.TablePlans = flattenPushDownPlan(p.tablePlan)
p.schema = p.tablePlan.Schema()
if p.StoreType == kv.TiFlash {
tableScans := p.GetTableScans()
// When PhysicalTableReader's store type is tiflash, has table scan
// and all table scans contained are not keepOrder, try to use batch cop.
if len(tableScans) > 0 {
for _, tableScan := range tableScans {
if tableScan.KeepOrder {
return &p
}
func (p *PhysicalTableReader) adjustReadReqType(ctx sessionctx.Context) {
if p.StoreType == kv.TiFlash {
_, ok := p.tablePlan.(*PhysicalExchangeSender)
if ok {
p.ReadReqType = MPP
return
}
tableScans := p.GetTableScans()
// When PhysicalTableReader's store type is tiflash, has table scan
// and all table scans contained are not keepOrder, try to use batch cop.
if len(tableScans) > 0 {
for _, tableScan := range tableScans {
if tableScan.KeepOrder {
return
}
}

// When allow batch cop is 1, only agg / topN uses batch cop.
// When allow batch cop is 2, every query uses batch cop.
switch ctx.GetSessionVars().AllowBatchCop {
case 1:
for _, plan := range p.TablePlans {
switch plan.(type) {
case *PhysicalHashAgg, *PhysicalStreamAgg, *PhysicalTopN:
p.BatchCop = true
}
// When allow batch cop is 1, only agg / topN uses batch cop.
// When allow batch cop is 2, every query uses batch cop.
switch ctx.GetSessionVars().AllowBatchCop {
case 1:
for _, plan := range p.TablePlans {
switch plan.(type) {
case *PhysicalHashAgg, *PhysicalStreamAgg, *PhysicalTopN:
p.ReadReqType = BatchCop
return
}
case 2:
p.BatchCop = true
}
case 2:
p.ReadReqType = BatchCop
}
}
}
}

// Init initializes PhysicalTableReader.
func (p PhysicalTableReader) Init(ctx sessionctx.Context, offset int) *PhysicalTableReader {
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeTableReader, &p, offset)
p.ReadReqType = Cop
if p.tablePlan == nil {
return &p
}
p.TablePlans = flattenPushDownPlan(p.tablePlan)
p.schema = p.tablePlan.Schema()
p.adjustReadReqType(ctx)
if p.ReadReqType == BatchCop || p.ReadReqType == MPP {
setMppOrBatchCopForTableScan(p.tablePlan)
}
return &p
}

Expand Down
37 changes: 31 additions & 6 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,30 @@ type tableScanAndPartitionInfo struct {
partitionInfo PartitionInfo
}

type readReqType uint8

const (
// Cop means read from storage by cop request.
Cop readReqType = iota
// BatchCop means read from storage by BatchCop request, only used for TiFlash
BatchCop
// MPP means read from storage by MPP request, only used for TiFlash
MPP
)

// Name returns the name of read request type.
func (r readReqType) Name() string {
switch r {
case BatchCop:
return "batchCop"
case MPP:
return "mpp"
default:
// return cop by default
return "cop"
}
}

// PhysicalTableReader is the table reader in tidb.
type PhysicalTableReader struct {
physicalSchemaProducer
Expand All @@ -84,8 +108,9 @@ type PhysicalTableReader struct {
// StoreType indicates table read from which type of store.
StoreType kv.StoreType

// BatchCop = true means the cop task in the physical table reader will be executed in batch mode(use in TiFlash only)
BatchCop bool
// ReadReqType is the read request type for current physical table reader, there are 3 kinds of read request: Cop,
// BatchCop and MPP, currently, the latter two are only used in TiFlash
ReadReqType readReqType

IsCommonHandle bool

Expand Down Expand Up @@ -129,14 +154,14 @@ func (p *PhysicalTableReader) GetTableScan() (*PhysicalTableScan, error) {
return tableScans[0], nil
}

// SetMppOrBatchCopForTableScan set IsMPPOrBatchCop for all TableScan.
func SetMppOrBatchCopForTableScan(curPlan PhysicalPlan) {
// setMppOrBatchCopForTableScan set IsMPPOrBatchCop for all TableScan.
func setMppOrBatchCopForTableScan(curPlan PhysicalPlan) {
if ts, ok := curPlan.(*PhysicalTableScan); ok {
ts.IsMPPOrBatchCop = true
}
children := curPlan.Children()
for _, child := range children {
SetMppOrBatchCopForTableScan(child)
setMppOrBatchCopForTableScan(child)
}
}

Expand Down Expand Up @@ -173,7 +198,7 @@ func (p *PhysicalTableReader) Clone() (PhysicalPlan, error) {
}
cloned.physicalSchemaProducer = *base
cloned.StoreType = p.StoreType
cloned.BatchCop = p.BatchCop
cloned.ReadReqType = p.ReadReqType
cloned.IsCommonHandle = p.IsCommonHandle
if cloned.tablePlan, err = p.tablePlan.Clone(); err != nil {
return nil, err
Expand Down
Loading