Skip to content

Commit

Permalink
executor: Enhance task field for explain/explain analyze (pingcap…
Browse files Browse the repository at this point in the history
  • Loading branch information
windtalker authored Mar 29, 2022
1 parent a4e09f6 commit c58e005
Show file tree
Hide file tree
Showing 7 changed files with 2,115 additions and 2,084 deletions.
6 changes: 1 addition & 5 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3143,7 +3143,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
plans: v.TablePlans,
tablePlan: v.GetTablePlan(),
storeType: v.StoreType,
batchCop: v.BatchCop,
batchCop: v.ReadReqType == plannercore.BatchCop,
}
e.buildVirtualColumnInfo()
if containsLimit(dagReq.Executors) {
Expand Down Expand Up @@ -3208,17 +3208,13 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E
}
})
if useMPPExecution(b.ctx, v) {
plannercore.SetMppOrBatchCopForTableScan(v.GetTablePlan())
return b.buildMPPGather(v)
}
ts, err := v.GetTableScan()
if err != nil {
b.err = err
return nil
}
if v.BatchCop {
ts.IsMPPOrBatchCop = true
}
ret, err := buildNoRangeTableReader(b, v)
if err != nil {
b.err = err
Expand Down
9 changes: 2 additions & 7 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1387,19 +1387,14 @@ func (e *Explain) explainPlanInRowFormat(p Plan, taskType, driverSide, indent st

switch x := p.(type) {
case *PhysicalTableReader:
var storeType string
switch x.StoreType {
case kv.TiKV, kv.TiFlash, kv.TiDB:
// expected do nothing
default:
return errors.Errorf("the store type %v is unknown", x.StoreType)
}
storeType = x.StoreType.Name()
taskName := "cop"
if x.BatchCop {
taskName = "batchCop"
}
err = e.explainPlanInRowFormat(x.tablePlan, taskName+"["+storeType+"]", "", childIndent, true)
taskName := x.ReadReqType.Name() + "[" + x.StoreType.Name() + "]"
err = e.explainPlanInRowFormat(x.tablePlan, taskName, "", childIndent, true)
case *PhysicalIndexReader:
err = e.explainPlanInRowFormat(x.indexPlan, "cop[tikv]", "", childIndent, true)
case *PhysicalIndexLookUpReader:
Expand Down
67 changes: 41 additions & 26 deletions planner/core/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,39 +414,54 @@ func (p PhysicalIndexMergeReader) Init(ctx sessionctx.Context, offset int) *Phys
return &p
}

// Init initializes PhysicalTableReader.
func (p PhysicalTableReader) Init(ctx sessionctx.Context, offset int) *PhysicalTableReader {
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeTableReader, &p, offset)
if p.tablePlan != nil {
p.TablePlans = flattenPushDownPlan(p.tablePlan)
p.schema = p.tablePlan.Schema()
if p.StoreType == kv.TiFlash {
tableScans := p.GetTableScans()
// When PhysicalTableReader's store type is tiflash, has table scan
// and all table scans contained are not keepOrder, try to use batch cop.
if len(tableScans) > 0 {
for _, tableScan := range tableScans {
if tableScan.KeepOrder {
return &p
}
func (p *PhysicalTableReader) adjustReadReqType(ctx sessionctx.Context) {
if p.StoreType == kv.TiFlash {
_, ok := p.tablePlan.(*PhysicalExchangeSender)
if ok {
p.ReadReqType = MPP
return
}
tableScans := p.GetTableScans()
// When PhysicalTableReader's store type is tiflash, has table scan
// and all table scans contained are not keepOrder, try to use batch cop.
if len(tableScans) > 0 {
for _, tableScan := range tableScans {
if tableScan.KeepOrder {
return
}
}

// When allow batch cop is 1, only agg / topN uses batch cop.
// When allow batch cop is 2, every query uses batch cop.
switch ctx.GetSessionVars().AllowBatchCop {
case 1:
for _, plan := range p.TablePlans {
switch plan.(type) {
case *PhysicalHashAgg, *PhysicalStreamAgg, *PhysicalTopN:
p.BatchCop = true
}
// When allow batch cop is 1, only agg / topN uses batch cop.
// When allow batch cop is 2, every query uses batch cop.
switch ctx.GetSessionVars().AllowBatchCop {
case 1:
for _, plan := range p.TablePlans {
switch plan.(type) {
case *PhysicalHashAgg, *PhysicalStreamAgg, *PhysicalTopN:
p.ReadReqType = BatchCop
return
}
case 2:
p.BatchCop = true
}
case 2:
p.ReadReqType = BatchCop
}
}
}
}

// Init initializes PhysicalTableReader.
func (p PhysicalTableReader) Init(ctx sessionctx.Context, offset int) *PhysicalTableReader {
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeTableReader, &p, offset)
p.ReadReqType = Cop
if p.tablePlan == nil {
return &p
}
p.TablePlans = flattenPushDownPlan(p.tablePlan)
p.schema = p.tablePlan.Schema()
p.adjustReadReqType(ctx)
if p.ReadReqType == BatchCop || p.ReadReqType == MPP {
setMppOrBatchCopForTableScan(p.tablePlan)
}
return &p
}

Expand Down
37 changes: 31 additions & 6 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,30 @@ type tableScanAndPartitionInfo struct {
partitionInfo PartitionInfo
}

type readReqType uint8

const (
// Cop means read from storage by cop request.
Cop readReqType = iota
// BatchCop means read from storage by BatchCop request, only used for TiFlash
BatchCop
// MPP means read from storage by MPP request, only used for TiFlash
MPP
)

// Name returns the name of read request type.
func (r readReqType) Name() string {
switch r {
case BatchCop:
return "batchCop"
case MPP:
return "mpp"
default:
// return cop by default
return "cop"
}
}

// PhysicalTableReader is the table reader in tidb.
type PhysicalTableReader struct {
physicalSchemaProducer
Expand All @@ -84,8 +108,9 @@ type PhysicalTableReader struct {
// StoreType indicates table read from which type of store.
StoreType kv.StoreType

// BatchCop = true means the cop task in the physical table reader will be executed in batch mode(use in TiFlash only)
BatchCop bool
// ReadReqType is the read request type for current physical table reader, there are 3 kinds of read request: Cop,
// BatchCop and MPP, currently, the latter two are only used in TiFlash
ReadReqType readReqType

IsCommonHandle bool

Expand Down Expand Up @@ -129,14 +154,14 @@ func (p *PhysicalTableReader) GetTableScan() (*PhysicalTableScan, error) {
return tableScans[0], nil
}

// SetMppOrBatchCopForTableScan set IsMPPOrBatchCop for all TableScan.
func SetMppOrBatchCopForTableScan(curPlan PhysicalPlan) {
// setMppOrBatchCopForTableScan set IsMPPOrBatchCop for all TableScan.
func setMppOrBatchCopForTableScan(curPlan PhysicalPlan) {
if ts, ok := curPlan.(*PhysicalTableScan); ok {
ts.IsMPPOrBatchCop = true
}
children := curPlan.Children()
for _, child := range children {
SetMppOrBatchCopForTableScan(child)
setMppOrBatchCopForTableScan(child)
}
}

Expand Down Expand Up @@ -173,7 +198,7 @@ func (p *PhysicalTableReader) Clone() (PhysicalPlan, error) {
}
cloned.physicalSchemaProducer = *base
cloned.StoreType = p.StoreType
cloned.BatchCop = p.BatchCop
cloned.ReadReqType = p.ReadReqType
cloned.IsCommonHandle = p.IsCommonHandle
if cloned.tablePlan, err = p.tablePlan.Clone(); err != nil {
return nil, err
Expand Down
Loading

0 comments on commit c58e005

Please sign in to comment.