diff --git a/pkg/planner/core/find_best_task.go b/pkg/planner/core/find_best_task.go index df7271d4db418..0f9d7abb26b56 100644 --- a/pkg/planner/core/find_best_task.go +++ b/pkg/planner/core/find_best_task.go @@ -276,7 +276,7 @@ func (p *baseLogicalPlan) enumeratePhysicalPlans4Task( } // Optimize by shuffle executor to running in parallel manner. - if _, isMpp := curTask.(*mppTask); !isMpp && prop.IsSortItemEmpty() { + if _, isMpp := curTask.(*MppTask); !isMpp && prop.IsSortItemEmpty() { // Currently, we do not regard shuffled plan as a new plan. curTask = optimizeByShuffle(curTask, p.Plan.SCtx()) } @@ -357,7 +357,7 @@ func (p *LogicalSequence) iterateChildPlan( if childTask != nil && childTask.Invalid() { return nil, 0, nil, nil } - _, isMpp := childTask.(*mppTask) + _, isMpp := childTask.(*MppTask) if !isMpp && prop.IsFlashProp() { break } @@ -382,7 +382,7 @@ func (p *LogicalSequence) iterateChildPlan( return nil, 0, nil, nil } - if _, ok := lastChildTask.(*mppTask); !ok && lastChildProp.CTEProducerStatus == property.AllCTECanMpp { + if _, ok := lastChildTask.(*MppTask); !ok && lastChildProp.CTEProducerStatus == property.AllCTECanMpp { return nil, 0, nil, nil } @@ -472,7 +472,7 @@ func getTaskPlanCost(t Task, pop *coreusage.PhysicalOptimizeOp) (float64, bool, indexPartialCost += partialCost } } - case *mppTask: + case *MppTask: taskType = property.MppTaskType default: return 0, false, errors.New("unknown task type") @@ -2443,7 +2443,7 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid return invalidTask, nil } // ********************************** future deprecated end **************************/ - mppTask := &mppTask{ + mppTask := &MppTask{ p: ts, partTp: property.AnyType, tblColHists: ds.TblColHists, @@ -2695,7 +2695,7 @@ func (ds *DataSource) convertToBatchPointGet(prop *property.PhysicalProperty, ca return rTsk } -func (ts *PhysicalTableScan) addPushedDownSelectionToMppTask(mpp *mppTask, stats *property.StatsInfo) *mppTask { +func (ts *PhysicalTableScan) addPushedDownSelectionToMppTask(mpp *MppTask, stats *property.StatsInfo) *MppTask { filterCondition, rootTaskConds := SplitSelCondsWithVirtualColumn(ts.filterCondition) var newRootConds []expression.Expression filterCondition, newRootConds = expression.PushDownExprs(GetPushDownCtx(ts.SCtx()), filterCondition, ts.StoreType) @@ -2852,7 +2852,7 @@ func (p *LogicalCTE) findBestTask(prop *property.PhysicalProperty, counter *Plan if prop.MPPPartitionTp != property.AnyType { return invalidTask, 1, nil } - t = &mppTask{ + t = &MppTask{ p: pcte, partTp: prop.MPPPartitionTp, hashCols: prop.MPPPartitionCols, diff --git a/pkg/planner/core/plan.go b/pkg/planner/core/plan.go index c5056ba73370d..364aabdfe0f7d 100644 --- a/pkg/planner/core/plan.go +++ b/pkg/planner/core/plan.go @@ -51,7 +51,7 @@ func AsSctx(pctx PlanContext) (sessionctx.Context, error) { func enforceProperty(p *property.PhysicalProperty, tsk Task, ctx PlanContext) Task { if p.TaskTp == property.MppTaskType { - mpp, ok := tsk.(*mppTask) + mpp, ok := tsk.(*MppTask) if !ok || mpp.Invalid() { return invalidTask } diff --git a/pkg/planner/core/task.go b/pkg/planner/core/task.go index 0773cc4c3f72f..f1a8f07642fa9 100644 --- a/pkg/planner/core/task.go +++ b/pkg/planner/core/task.go @@ -37,12 +37,10 @@ import ( "github.com/pingcap/tidb/pkg/util/paging" "github.com/pingcap/tidb/pkg/util/plancodec" "github.com/pingcap/tidb/pkg/util/size" - "github.com/pingcap/tipb/go-tipb" "go.uber.org/zap" ) var _ Task = &copTask{} -var _ Task = &mppTask{} // copTask is a task that runs in a distributed kv store. // TODO: In future, we should split copTask to indexTask and tableTask. @@ -126,7 +124,7 @@ func attachPlan2Task(p PhysicalPlan, t Task) Task { case *RootTask: p.SetChildren(v.GetPlan()) v.SetPlan(p) - case *mppTask: + case *MppTask: p.SetChildren(v.p) v.p = p } @@ -405,7 +403,7 @@ func appendExpr(p *PhysicalProjection, expr expression.Expression) *expression.C // TiFlash join require that partition key has exactly the same type, while TiDB only guarantee the partition key is the same catalog, // so if the partition key type is not exactly the same, we need add a projection below the join or exchanger if exists. -func (p *PhysicalHashJoin) convertPartitionKeysIfNeed(lTask, rTask *mppTask) (*mppTask, *mppTask) { +func (p *PhysicalHashJoin) convertPartitionKeysIfNeed(lTask, rTask *MppTask) (*MppTask, *MppTask) { lp := lTask.p if _, ok := lp.(*PhysicalExchangeReceiver); ok { lp = lp.Children()[0].Children()[0] @@ -470,7 +468,7 @@ func (p *PhysicalHashJoin) convertPartitionKeysIfNeed(lTask, rTask *mppTask) (*m } // if left or right child changes, we need to add enforcer. if lChanged { - nlTask := lTask.Copy().(*mppTask) + nlTask := lTask.Copy().(*MppTask) nlTask.p = lProj nlTask = nlTask.enforceExchanger(&property.PhysicalProperty{ TaskTp: property.MppTaskType, @@ -480,7 +478,7 @@ func (p *PhysicalHashJoin) convertPartitionKeysIfNeed(lTask, rTask *mppTask) (*m lTask = nlTask } if rChanged { - nrTask := rTask.Copy().(*mppTask) + nrTask := rTask.Copy().(*MppTask) nrTask.p = rProj nrTask = nrTask.enforceExchanger(&property.PhysicalProperty{ TaskTp: property.MppTaskType, @@ -493,8 +491,8 @@ func (p *PhysicalHashJoin) convertPartitionKeysIfNeed(lTask, rTask *mppTask) (*m } func (p *PhysicalHashJoin) attach2TaskForMpp(tasks ...Task) Task { - lTask, lok := tasks[0].(*mppTask) - rTask, rok := tasks[1].(*mppTask) + lTask, lok := tasks[0].(*MppTask) + rTask, rok := tasks[1].(*MppTask) if !lok || !rok { return invalidTask } @@ -525,7 +523,7 @@ func (p *PhysicalHashJoin) attach2TaskForMpp(tasks ...Task) Task { if outerTaskIndex == 1 { outerTask = rTask } - task := &mppTask{ + task := &MppTask{ p: p, partTp: outerTask.partTp, hashCols: outerTask.hashCols, @@ -901,12 +899,12 @@ func (p *PhysicalLimit) Attach2Task(tasks ...Task) Task { // Whatever the remained case is, we directly convert to it to root task. t = cop.ConvertToRootTask(p.SCtx()) } - } else if mpp, ok := t.(*mppTask); ok { + } else if mpp, ok := t.(*MppTask); ok { newCount := p.Offset + p.Count childProfile := mpp.Plan().StatsInfo() stats := deriveLimitStats(childProfile, float64(newCount)) pushedDownLimit := PhysicalLimit{Count: newCount, PartitionBy: newPartitionBy}.Init(p.SCtx(), stats, p.QueryBlockOffset()) - mpp = attachPlan2Task(pushedDownLimit, mpp).(*mppTask) + mpp = attachPlan2Task(pushedDownLimit, mpp).(*MppTask) pushedDownLimit.SetSchema(pushedDownLimit.children[0].Schema()) t = mpp.ConvertToRootTask(p.SCtx()) } @@ -1138,7 +1136,7 @@ func (p *PhysicalTopN) canPushDownToTiKV(copTask *copTask) bool { } // canPushDownToTiFlash checks whether this topN can be pushed down to TiFlash. -func (p *PhysicalTopN) canPushDownToTiFlash(mppTask *mppTask) bool { +func (p *PhysicalTopN) canPushDownToTiFlash(mppTask *MppTask) bool { if !p.canExpressionConvertedToPB(kv.TiFlash) { return false } @@ -1169,7 +1167,7 @@ func (p *PhysicalTopN) Attach2Task(tasks ...Task) Task { pushedDownTopN = p.getPushedDownTopN(copTask.tablePlan) copTask.tablePlan = pushedDownTopN } - } else if mppTask, ok := t.(*mppTask); ok && needPushDown && p.canPushDownToTiFlash(mppTask) { + } else if mppTask, ok := t.(*MppTask); ok && needPushDown && p.canPushDownToTiFlash(mppTask) { pushedDownTopN := p.getPushedDownTopN(mppTask.p) mppTask.p = pushedDownTopN } @@ -1186,7 +1184,7 @@ func (p *PhysicalTopN) Attach2Task(tasks ...Task) Task { func (p *PhysicalExpand) Attach2Task(tasks ...Task) Task { t := tasks[0].Copy() // current expand can only be run in MPP TiFlash mode. - if mpp, ok := t.(*mppTask); ok { + if mpp, ok := t.(*MppTask); ok { p.SetChildren(mpp.p) mpp.p = p return mpp @@ -1202,7 +1200,7 @@ func (p *PhysicalProjection) Attach2Task(tasks ...Task) Task { copTask := attachPlan2Task(p, cop) return copTask } - } else if mpp, ok := t.(*mppTask); ok { + } else if mpp, ok := t.(*MppTask); ok { if expression.CanExprsPushDown(GetPushDownCtx(p.SCtx()), p.Exprs, kv.TiFlash) { p.SetChildren(mpp.p) mpp.p = p @@ -1218,10 +1216,10 @@ func (p *PhysicalProjection) Attach2Task(tasks ...Task) Task { } func (p *PhysicalUnionAll) attach2MppTasks(tasks ...Task) Task { - t := &mppTask{p: p} + t := &MppTask{p: p} childPlans := make([]PhysicalPlan, 0, len(tasks)) for _, tk := range tasks { - if mpp, ok := tk.(*mppTask); ok && !tk.Invalid() { + if mpp, ok := tk.(*MppTask); ok && !tk.Invalid() { childPlans = append(childPlans, mpp.Plan()) } else if root, ok := tk.(*RootTask); ok && root.IsEmpty() { continue @@ -1239,7 +1237,7 @@ func (p *PhysicalUnionAll) attach2MppTasks(tasks ...Task) Task { // Attach2Task implements PhysicalPlan interface. func (p *PhysicalUnionAll) Attach2Task(tasks ...Task) Task { for _, t := range tasks { - if _, ok := t.(*mppTask); ok { + if _, ok := t.(*MppTask); ok { if p.TP() == plancodec.TypePartitionUnion { // In attach2MppTasks(), will attach PhysicalUnion to mppTask directly. // But PartitionUnion cannot pushdown to tiflash, so here disable PartitionUnion pushdown to tiflash explicitly. @@ -1262,7 +1260,7 @@ func (p *PhysicalUnionAll) Attach2Task(tasks ...Task) Task { // Attach2Task implements PhysicalPlan interface. func (sel *PhysicalSelection) Attach2Task(tasks ...Task) Task { - if mppTask, _ := tasks[0].(*mppTask); mppTask != nil { // always push to mpp task. + if mppTask, _ := tasks[0].(*MppTask); mppTask != nil { // always push to mpp task. if expression.CanExprsPushDown(GetPushDownCtx(sel.SCtx()), sel.Conditions, kv.TiFlash) { return attachPlan2Task(sel, mppTask.Copy()) } @@ -1955,7 +1953,7 @@ func (p *PhysicalStreamAgg) Attach2Task(tasks ...Task) Task { t = cop.ConvertToRootTask(p.SCtx()) attachPlan2Task(finalAgg, t) } - } else if mpp, ok := t.(*mppTask); ok { + } else if mpp, ok := t.(*MppTask); ok { t = mpp.ConvertToRootTask(p.SCtx()) attachPlan2Task(p, t) } else { @@ -1982,7 +1980,7 @@ func (p *PhysicalHashAgg) cpuCostDivisor(hasDistinct bool) (divisor, con float64 return math.Min(float64(finalCon), float64(partialCon)), float64(finalCon + partialCon) } -func (p *PhysicalHashAgg) attach2TaskForMpp1Phase(mpp *mppTask) Task { +func (p *PhysicalHashAgg) attach2TaskForMpp1Phase(mpp *MppTask) Task { // 1-phase agg: when the partition columns can be satisfied, where the plan does not need to enforce Exchange // only push down the original agg proj := p.convertAvgForMPP() @@ -2109,7 +2107,7 @@ func (p *PhysicalHashAgg) scaleStats4GroupingSets(groupingSets expression.Groupi // +- Expand {}, {} -> expand // +- TableScan foo func (p *PhysicalHashAgg) adjust3StagePhaseAgg(partialAgg, finalAgg PhysicalPlan, canUse3StageAgg bool, - groupingSets expression.GroupingSets, mpp *mppTask) (final, mid, part, proj4Part PhysicalPlan, _ error) { + groupingSets expression.GroupingSets, mpp *MppTask) (final, mid, part, proj4Part PhysicalPlan, _ error) { if !(partialAgg != nil && canUse3StageAgg) { // quick path: return the original finalAgg and partiAgg. return finalAgg, nil, partialAgg, nil, nil @@ -2294,7 +2292,7 @@ func (p *PhysicalHashAgg) adjust3StagePhaseAgg(partialAgg, finalAgg PhysicalPlan func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...Task) Task { t := tasks[0].Copy() - mpp, ok := t.(*mppTask) + mpp, ok := t.(*MppTask) if !ok { return invalidTask } @@ -2455,7 +2453,7 @@ func (p *PhysicalHashAgg) Attach2Task(tasks ...Task) Task { t = cop.ConvertToRootTask(p.SCtx()) attachPlan2Task(p, t) } - } else if _, ok := t.(*mppTask); ok { + } else if _, ok := t.(*MppTask); ok { return p.attach2TaskForMpp(tasks...) } else { attachPlan2Task(p, t) @@ -2463,7 +2461,7 @@ func (p *PhysicalHashAgg) Attach2Task(tasks ...Task) Task { return t } -func (p *PhysicalWindow) attach2TaskForMPP(mpp *mppTask) Task { +func (p *PhysicalWindow) attach2TaskForMPP(mpp *MppTask) Task { // FIXME: currently, tiflash's join has different schema with TiDB, // so we have to rebuild the schema of join and operators which may inherit schema from join. // for window, we take the sub-plan's schema, and the schema generated by windowDescs. @@ -2481,7 +2479,7 @@ func (p *PhysicalWindow) attach2TaskForMPP(mpp *mppTask) Task { // Attach2Task implements the PhysicalPlan interface. func (p *PhysicalWindow) Attach2Task(tasks ...Task) Task { - if mpp, ok := tasks[0].Copy().(*mppTask); ok && p.storeTp == kv.TiFlash { + if mpp, ok := tasks[0].Copy().(*MppTask); ok && p.storeTp == kv.TiFlash { return p.attach2TaskForMPP(mpp) } t := tasks[0].ConvertToRootTask(p.SCtx()) @@ -2491,9 +2489,9 @@ func (p *PhysicalWindow) Attach2Task(tasks ...Task) Task { // Attach2Task implements the PhysicalPlan interface. func (p *PhysicalCTEStorage) Attach2Task(tasks ...Task) Task { t := tasks[0].Copy() - if mpp, ok := t.(*mppTask); ok { + if mpp, ok := t.(*MppTask); ok { p.SetChildren(t.Plan()) - return &mppTask{ + return &MppTask{ p: p, partTp: mpp.partTp, hashCols: mpp.hashCols, @@ -2510,13 +2508,13 @@ func (p *PhysicalCTEStorage) Attach2Task(tasks ...Task) Task { // Attach2Task implements the PhysicalPlan interface. func (p *PhysicalSequence) Attach2Task(tasks ...Task) Task { for _, t := range tasks { - _, isMpp := t.(*mppTask) + _, isMpp := t.(*MppTask) if !isMpp { return tasks[len(tasks)-1] } } - lastTask := tasks[len(tasks)-1].(*mppTask) + lastTask := tasks[len(tasks)-1].(*MppTask) children := make([]PhysicalPlan, 0, len(tasks)) for _, t := range tasks { @@ -2525,7 +2523,7 @@ func (p *PhysicalSequence) Attach2Task(tasks ...Task) Task { p.SetChildren(children...) - mppTask := &mppTask{ + mppTask := &MppTask{ p: p, partTp: lastTask.partTp, hashCols: lastTask.hashCols, @@ -2534,70 +2532,6 @@ func (p *PhysicalSequence) Attach2Task(tasks ...Task) Task { return mppTask } -// mppTask can not : -// 1. keep order -// 2. support double read -// 3. consider virtual columns. -// 4. TODO: partition prune after close -type mppTask struct { - p PhysicalPlan - - partTp property.MPPPartitionType - hashCols []*property.MPPPartitionColumn - - // rootTaskConds record filters of TableScan that cannot be pushed down to TiFlash. - - // For logical plan like: HashAgg -> Selection -> TableScan, if filters in Selection cannot be pushed to TiFlash. - // Planner will generate physical plan like: PhysicalHashAgg -> PhysicalSelection -> TableReader -> PhysicalTableScan(cop tiflash) - // Because planner will make mppTask invalid directly then use copTask directly. - - // But in DisaggregatedTiFlash mode, cop and batchCop protocol is disabled, so we have to consider this situation for mppTask. - // When generating PhysicalTableScan, if prop.TaskTp is RootTaskType, mppTask will be converted to rootTask, - // and filters in rootTaskConds will be added in a Selection which will be executed in TiDB. - // So physical plan be like: PhysicalHashAgg -> PhysicalSelection -> TableReader -> ExchangeSender -> PhysicalTableScan(mpp tiflash) - rootTaskConds []expression.Expression - tblColHists *statistics.HistColl -} - -// Count implements Task interface. -func (t *mppTask) Count() float64 { - return t.p.StatsInfo().RowCount -} - -// Copy implements Task interface. -func (t *mppTask) Copy() Task { - nt := *t - return &nt -} - -// Plan implements Task interface. -func (t *mppTask) Plan() PhysicalPlan { - return t.p -} - -// Invalid implements Task interface. -func (t *mppTask) Invalid() bool { - return t.p == nil -} - -// ConvertToRootTask implements Task interface. -func (t *mppTask) ConvertToRootTask(ctx PlanContext) *RootTask { - return t.Copy().(*mppTask).ConvertToRootTaskImpl(ctx) -} - -// MemoryUsage return the memory usage of mppTask -func (t *mppTask) MemoryUsage() (sum int64) { - if t == nil { - return - } - - sum = size.SizeOfInterface + size.SizeOfInt + size.SizeOfSlice + int64(cap(t.hashCols))*size.SizeOfPointer - if t.p != nil { - sum += t.p.MemoryUsage() - } - return -} - func collectPartitionInfosFromMPPPlan(p *PhysicalTableReader, mppPlan PhysicalPlan) { switch x := mppPlan.(type) { case *PhysicalTableScan: @@ -2636,52 +2570,7 @@ func tryExpandVirtualColumn(p PhysicalPlan) { } } -func (t *mppTask) ConvertToRootTaskImpl(ctx PlanContext) *RootTask { - // In disaggregated-tiflash mode, need to consider generated column. - tryExpandVirtualColumn(t.p) - sender := PhysicalExchangeSender{ - ExchangeType: tipb.ExchangeType_PassThrough, - }.Init(ctx, t.p.StatsInfo()) - sender.SetChildren(t.p) - - p := PhysicalTableReader{ - tablePlan: sender, - StoreType: kv.TiFlash, - }.Init(ctx, t.p.QueryBlockOffset()) - p.SetStats(t.p.StatsInfo()) - collectPartitionInfosFromMPPPlan(p, t.p) - rt := &RootTask{} - rt.SetPlan(p) - - if len(t.rootTaskConds) > 0 { - // Some Filter cannot be pushed down to TiFlash, need to add Selection in rootTask, - // so this Selection will be executed in TiDB. - _, isTableScan := t.p.(*PhysicalTableScan) - _, isSelection := t.p.(*PhysicalSelection) - if isSelection { - _, isTableScan = t.p.Children()[0].(*PhysicalTableScan) - } - if !isTableScan { - // Need to make sure oriTaskPlan is TableScan, because rootTaskConds is part of TableScan.FilterCondition. - // It's only for TableScan. This is ensured by converting mppTask to rootTask just after TableScan is built, - // so no other operators are added into this mppTask. - logutil.BgLogger().Error("expect Selection or TableScan for mppTask.p", zap.String("mppTask.p", t.p.TP())) - return invalidTask - } - selectivity, _, err := cardinality.Selectivity(ctx, t.tblColHists, t.rootTaskConds, nil) - if err != nil { - logutil.BgLogger().Debug("calculate selectivity failed, use selection factor", zap.Error(err)) - selectivity = SelectionFactor - } - sel := PhysicalSelection{Conditions: t.rootTaskConds}.Init(ctx, rt.GetPlan().StatsInfo().Scale(selectivity), rt.GetPlan().QueryBlockOffset()) - sel.fromDataSource = true - sel.SetChildren(rt.GetPlan()) - rt.SetPlan(sel) - } - return rt -} - -func (t *mppTask) needEnforceExchanger(prop *property.PhysicalProperty) bool { +func (t *MppTask) needEnforceExchanger(prop *property.PhysicalProperty) bool { switch prop.MPPPartitionTp { case property.AnyType: return false @@ -2708,19 +2597,19 @@ func (t *mppTask) needEnforceExchanger(prop *property.PhysicalProperty) bool { } } -func (t *mppTask) enforceExchanger(prop *property.PhysicalProperty) *mppTask { +func (t *MppTask) enforceExchanger(prop *property.PhysicalProperty) *MppTask { if !t.needEnforceExchanger(prop) { return t } - return t.Copy().(*mppTask).enforceExchangerImpl(prop) + return t.Copy().(*MppTask).enforceExchangerImpl(prop) } -func (t *mppTask) enforceExchangerImpl(prop *property.PhysicalProperty) *mppTask { +func (t *MppTask) enforceExchangerImpl(prop *property.PhysicalProperty) *MppTask { if collate.NewCollationEnabled() && !t.p.SCtx().GetSessionVars().HashExchangeWithNewCollation && prop.MPPPartitionTp == property.HashType { for _, col := range prop.MPPPartitionCols { if types.IsString(col.Col.RetType.GetType()) { t.p.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because when `new_collation_enabled` is true, HashJoin or HashAgg with string key is not supported now.") - return &mppTask{} + return &MppTask{} } } } @@ -2737,7 +2626,7 @@ func (t *mppTask) enforceExchangerImpl(prop *property.PhysicalProperty) *mppTask sender.SetChildren(t.p) receiver := PhysicalExchangeReceiver{}.Init(ctx, t.p.StatsInfo()) receiver.SetChildren(sender) - return &mppTask{ + return &MppTask{ p: receiver, partTp: prop.MPPPartitionTp, hashCols: prop.MPPPartitionCols, diff --git a/pkg/planner/core/task_base.go b/pkg/planner/core/task_base.go index 0021411e65a4a..afe70e001fe7b 100644 --- a/pkg/planner/core/task_base.go +++ b/pkg/planner/core/task_base.go @@ -15,11 +15,20 @@ package core import ( + "github.com/pingcap/tidb/pkg/expression" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/planner/cardinality" + "github.com/pingcap/tidb/pkg/planner/property" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/size" + "github.com/pingcap/tipb/go-tipb" + "go.uber.org/zap" ) var ( _ Task = &RootTask{} + _ Task = &MppTask{} ) // Task is a new version of `PhysicalPlanInfo`. It stores cost information for a task. @@ -104,3 +113,113 @@ func (t *RootTask) MemoryUsage() (sum int64) { } return sum } + +// MppTask can not : +// 1. keep order +// 2. support double read +// 3. consider virtual columns. +// 4. TODO: partition prune after close +type MppTask struct { + p PhysicalPlan + + partTp property.MPPPartitionType + hashCols []*property.MPPPartitionColumn + + // rootTaskConds record filters of TableScan that cannot be pushed down to TiFlash. + + // For logical plan like: HashAgg -> Selection -> TableScan, if filters in Selection cannot be pushed to TiFlash. + // Planner will generate physical plan like: PhysicalHashAgg -> PhysicalSelection -> TableReader -> PhysicalTableScan(cop tiflash) + // Because planner will make mppTask invalid directly then use copTask directly. + + // But in DisaggregatedTiFlash mode, cop and batchCop protocol is disabled, so we have to consider this situation for mppTask. + // When generating PhysicalTableScan, if prop.TaskTp is RootTaskType, mppTask will be converted to rootTask, + // and filters in rootTaskConds will be added in a Selection which will be executed in TiDB. + // So physical plan be like: PhysicalHashAgg -> PhysicalSelection -> TableReader -> ExchangeSender -> PhysicalTableScan(mpp tiflash) + rootTaskConds []expression.Expression + tblColHists *statistics.HistColl +} + +// Count implements Task interface. +func (t *MppTask) Count() float64 { + return t.p.StatsInfo().RowCount +} + +// Copy implements Task interface. +func (t *MppTask) Copy() Task { + nt := *t + return &nt +} + +// Plan implements Task interface. +func (t *MppTask) Plan() PhysicalPlan { + return t.p +} + +// Invalid implements Task interface. +func (t *MppTask) Invalid() bool { + return t.p == nil +} + +// ConvertToRootTask implements Task interface. +func (t *MppTask) ConvertToRootTask(ctx PlanContext) *RootTask { + return t.Copy().(*MppTask).ConvertToRootTaskImpl(ctx) +} + +// MemoryUsage return the memory usage of mppTask +func (t *MppTask) MemoryUsage() (sum int64) { + if t == nil { + return + } + + sum = size.SizeOfInterface + size.SizeOfInt + size.SizeOfSlice + int64(cap(t.hashCols))*size.SizeOfPointer + if t.p != nil { + sum += t.p.MemoryUsage() + } + return +} + +// ConvertToRootTaskImpl implements Task interface. +func (t *MppTask) ConvertToRootTaskImpl(ctx PlanContext) *RootTask { + // In disaggregated-tiflash mode, need to consider generated column. + tryExpandVirtualColumn(t.p) + sender := PhysicalExchangeSender{ + ExchangeType: tipb.ExchangeType_PassThrough, + }.Init(ctx, t.p.StatsInfo()) + sender.SetChildren(t.p) + + p := PhysicalTableReader{ + tablePlan: sender, + StoreType: kv.TiFlash, + }.Init(ctx, t.p.QueryBlockOffset()) + p.SetStats(t.p.StatsInfo()) + collectPartitionInfosFromMPPPlan(p, t.p) + rt := &RootTask{} + rt.SetPlan(p) + + if len(t.rootTaskConds) > 0 { + // Some Filter cannot be pushed down to TiFlash, need to add Selection in rootTask, + // so this Selection will be executed in TiDB. + _, isTableScan := t.p.(*PhysicalTableScan) + _, isSelection := t.p.(*PhysicalSelection) + if isSelection { + _, isTableScan = t.p.Children()[0].(*PhysicalTableScan) + } + if !isTableScan { + // Need to make sure oriTaskPlan is TableScan, because rootTaskConds is part of TableScan.FilterCondition. + // It's only for TableScan. This is ensured by converting mppTask to rootTask just after TableScan is built, + // so no other operators are added into this mppTask. + logutil.BgLogger().Error("expect Selection or TableScan for mppTask.p", zap.String("mppTask.p", t.p.TP())) + return invalidTask + } + selectivity, _, err := cardinality.Selectivity(ctx, t.tblColHists, t.rootTaskConds, nil) + if err != nil { + logutil.BgLogger().Debug("calculate selectivity failed, use selection factor", zap.Error(err)) + selectivity = SelectionFactor + } + sel := PhysicalSelection{Conditions: t.rootTaskConds}.Init(ctx, rt.GetPlan().StatsInfo().Scale(selectivity), rt.GetPlan().QueryBlockOffset()) + sel.fromDataSource = true + sel.SetChildren(rt.GetPlan()) + rt.SetPlan(sel) + } + return rt +}