Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: Revert tidb_allow_mpp modification for downgrade compatibility and add warnings for enforce mpp. #25302

Merged
merged 20 commits into from
Jun 11, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 27 additions & 16 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (

func (p *LogicalUnionScan) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool, error) {
if prop.IsFlashProp() {
p.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced(
"Can't use mpp mode because operator `UnionScan` is not supported now.")
return nil, true, nil
}
childProp := prop.CloneEssentialFields()
Expand Down Expand Up @@ -1687,7 +1689,7 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P
if prop.IsFlashProp() && ((p.preferJoinType&preferBCJoin) == 0 && p.preferJoinType > 0) {
return nil, false, nil
}
if prop.PartitionTp == property.BroadcastType {
if prop.MPPPartitionTp == property.BroadcastType {
return nil, false, nil
}
joins := make([]PhysicalPlan, 0, 8)
Expand Down Expand Up @@ -1785,7 +1787,7 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC
return nil
}

if prop.PartitionTp == property.BroadcastType {
if prop.MPPPartitionTp == property.BroadcastType {
return nil
}
if !canExprsInJoinPushdown(p, kv.TiFlash) {
Expand Down Expand Up @@ -1828,27 +1830,27 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC
baseJoin.InnerChildIdx = preferredBuildIndex
childrenProps := make([]*property.PhysicalProperty, 2)
if useBCJ {
childrenProps[preferredBuildIndex] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, PartitionTp: property.BroadcastType, CanAddEnforcer: true}
childrenProps[preferredBuildIndex] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.BroadcastType, CanAddEnforcer: true}
expCnt := math.MaxFloat64
if prop.ExpectedCnt < p.stats.RowCount {
expCntScale := prop.ExpectedCnt / p.stats.RowCount
expCnt = p.children[1-preferredBuildIndex].statsInfo().RowCount * expCntScale
}
if prop.PartitionTp == property.HashType {
if prop.MPPPartitionTp == property.HashType {
hashKeys := rkeys
if preferredBuildIndex == 1 {
hashKeys = lkeys
}
if matches := prop.IsSubsetOf(hashKeys); len(matches) != 0 {
childrenProps[1-preferredBuildIndex] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: expCnt, PartitionTp: property.HashType, PartitionCols: prop.PartitionCols}
childrenProps[1-preferredBuildIndex] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: expCnt, MPPPartitionTp: property.HashType, MPPPartitionCols: prop.MPPPartitionCols}
} else {
return nil
}
} else {
childrenProps[1-preferredBuildIndex] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: expCnt, PartitionTp: property.AnyType}
childrenProps[1-preferredBuildIndex] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: expCnt, MPPPartitionTp: property.AnyType}
}
} else {
if prop.PartitionTp == property.HashType {
if prop.MPPPartitionTp == property.HashType {
var matches []int
if matches = prop.IsSubsetOf(lkeys); len(matches) == 0 {
matches = prop.IsSubsetOf(rkeys)
Expand All @@ -1859,8 +1861,8 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC
lkeys = chooseSubsetOfJoinKeys(lkeys, matches)
rkeys = chooseSubsetOfJoinKeys(rkeys, matches)
}
childrenProps[0] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, PartitionTp: property.HashType, PartitionCols: lkeys, CanAddEnforcer: true}
childrenProps[1] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, PartitionTp: property.HashType, PartitionCols: rkeys, CanAddEnforcer: true}
childrenProps[0] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.HashType, MPPPartitionCols: lkeys, CanAddEnforcer: true}
childrenProps[1] = &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.HashType, MPPPartitionCols: rkeys, CanAddEnforcer: true}
}
join := PhysicalHashJoin{
basePhysicalJoin: baseJoin,
Expand Down Expand Up @@ -2096,6 +2098,8 @@ func (la *LogicalApply) GetHashJoin(prop *property.PhysicalProperty) *PhysicalHa

func (la *LogicalApply) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool, error) {
if !prop.AllColsFromSchema(la.children[0].Schema()) || prop.IsFlashProp() { // for convenient, we don't pass through any prop
la.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced(
"Can't use mpp mode because operator `Apply` is not supported now.")
return nil, true, nil
}
disableAggPushDownToCop(la.children[0])
Expand Down Expand Up @@ -2142,6 +2146,8 @@ func disableAggPushDownToCop(p LogicalPlan) {
}

func (p *LogicalWindow) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool, error) {
p.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced(
"Can't use mpp mode because operator `Window` is not supported now.")
if prop.IsFlashProp() {
return nil, true, nil
}
Expand Down Expand Up @@ -2199,13 +2205,15 @@ func (p *baseLogicalPlan) canPushToCopImpl(storeTp kv.StoreType, considerDual bo
}
case *LogicalTableDual:
return storeTp == kv.TiFlash && considerDual
case *LogicalAggregation, *LogicalSelection, *LogicalJoin:
case *LogicalAggregation, *LogicalSelection, *LogicalJoin, *LogicalLimit, *LogicalTopN:
LittleFall marked this conversation as resolved.
Show resolved Hide resolved
if storeTp == kv.TiFlash {
ret = ret && c.canPushToCop(storeTp)
} else {
return false
}
default:
p.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced(
"Can't use mpp mode because operator `" + c.TP() + "` is not supported now.")
return false
}
}
Expand Down Expand Up @@ -2363,13 +2371,13 @@ func (la *LogicalAggregation) tryToGetMppHashAggs(prop *property.PhysicalPropert
if prop.TaskTp != property.RootTaskType && prop.TaskTp != property.MppTaskType {
return nil
}
if prop.PartitionTp == property.BroadcastType {
if prop.MPPPartitionTp == property.BroadcastType {
return nil
}
if len(la.GroupByItems) > 0 {
partitionCols := la.GetGroupByCols()
// trying to match the required parititions.
if prop.PartitionTp == property.HashType {
if prop.MPPPartitionTp == property.HashType {
if matches := prop.IsSubsetOf(partitionCols); len(matches) != 0 {
partitionCols = chooseSubsetOfJoinKeys(partitionCols, matches)
} else {
Expand All @@ -2382,15 +2390,15 @@ func (la *LogicalAggregation) tryToGetMppHashAggs(prop *property.PhysicalPropert
// If there are no available partition cols, but still have group by items, that means group by items are all expressions or constants.
// To avoid mess, we don't do any one-phase aggregation in this case.
if len(partitionCols) != 0 {
childProp := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, PartitionTp: property.HashType, PartitionCols: partitionCols, CanAddEnforcer: true}
childProp := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.HashType, MPPPartitionCols: partitionCols, CanAddEnforcer: true}
agg := NewPhysicalHashAgg(la, la.stats.ScaleByExpectCnt(prop.ExpectedCnt), childProp)
agg.SetSchema(la.schema.Clone())
agg.MppRunMode = Mpp1Phase
hashAggs = append(hashAggs, agg)
}

// 2-phase agg
childProp := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, PartitionTp: property.AnyType}
childProp := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.AnyType}
agg := NewPhysicalHashAgg(la, la.stats.ScaleByExpectCnt(prop.ExpectedCnt), childProp)
agg.SetSchema(la.schema.Clone())
agg.MppRunMode = Mpp2Phase
Expand Down Expand Up @@ -2429,7 +2437,7 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy
taskTypes = append(taskTypes, property.CopTiFlashLocalReadTaskType)
}
canPushDownToTiFlash := la.canPushToCop(kv.TiFlash)
canPushDownToMPP := la.ctx.GetSessionVars().IsMPPAllowed() && la.checkCanPushDownToMPP() && canPushDownToTiFlash
canPushDownToMPP := canPushDownToTiFlash && la.ctx.GetSessionVars().IsMPPAllowed() && la.checkCanPushDownToMPP()
if la.HasDistinct() {
// TODO: remove after the cost estimation of distinct pushdown is implemented.
if !la.ctx.GetSessionVars().AllowDistinctAggPushDown {
Expand Down Expand Up @@ -2557,6 +2565,8 @@ func (p *LogicalLimit) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]

func (p *LogicalLock) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool, error) {
if prop.IsFlashProp() {
p.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced(
"Can't use mpp mode because operator `Lock` is not supported now.")
return nil, true, nil
}
childProp := prop.CloneEssentialFields()
Expand All @@ -2574,7 +2584,7 @@ func (p *LogicalUnionAll) exhaustPhysicalPlans(prop *property.PhysicalProperty)
return nil, true, nil
}
// TODO: UnionAll can pass partition info, but for briefness, we prevent it from pushing down.
if prop.TaskTp == property.MppTaskType && prop.PartitionTp != property.AnyType {
if prop.TaskTp == property.MppTaskType && prop.MPPPartitionTp != property.AnyType {
return nil, true, nil
}
canUseMpp := p.ctx.GetSessionVars().IsMPPAllowed() && p.canPushToCopImpl(kv.TiFlash, true)
Expand Down Expand Up @@ -2650,6 +2660,7 @@ func (ls *LogicalSort) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]

func (p *LogicalMaxOneRow) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool, error) {
if !prop.IsEmpty() || prop.IsFlashProp() {
p.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("Can't use mpp mode because operator `MaxOneRow` is not supported now.")
return nil, true, nil
}
mor := PhysicalMaxOneRow{}.Init(p.ctx, p.stats, p.blockOffset, &property.PhysicalProperty{ExpectedCnt: 2})
Expand Down
12 changes: 7 additions & 5 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,8 @@ func (p *baseLogicalPlan) findBestTask(prop *property.PhysicalProperty, planCoun
// try to get the task with an enforced sort.
newProp.SortItems = []property.SortItem{}
newProp.ExpectedCnt = math.MaxFloat64
newProp.PartitionCols = nil
newProp.PartitionTp = property.AnyType
newProp.MPPPartitionCols = nil
newProp.MPPPartitionTp = property.AnyType
var hintCanWork bool
plansNeedEnforce, hintCanWork, err = p.self.exhaustPhysicalPlans(newProp)
if err != nil {
Expand Down Expand Up @@ -644,8 +644,8 @@ func (ds *DataSource) findBestTask(prop *property.PhysicalProperty, planCounter
}
// Next, get the bestTask with enforced prop
prop.SortItems = []property.SortItem{}
prop.PartitionTp = property.AnyType
} else if prop.PartitionTp != property.AnyType {
prop.MPPPartitionTp = property.AnyType
} else if prop.MPPPartitionTp != property.AnyType {
return invalidTask, 0, nil
}
defer func() {
Expand Down Expand Up @@ -1546,12 +1546,14 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
if ts.KeepOrder {
return &mppTask{}, nil
}
if prop.PartitionTp != property.AnyType || ts.isPartition {
if prop.MPPPartitionTp != property.AnyType || ts.isPartition {
// If ts is a single partition, then this partition table is in static-only prune, then we should not choose mpp execution.
ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("Can't use mpp mode because table `" + ds.tableInfo.Name.O + "`is a partition table which is not supported when `@@tidb_partition_prune_mode=static`.")
return &mppTask{}, nil
}
for _, col := range ts.schema.Columns {
if col.VirtualExpr != nil {
ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("Can't use mpp mode because column `" + col.OrigName + "` is a virtual column which is not supported now.")
return &mppTask{}, nil
}
}
Expand Down
2 changes: 2 additions & 0 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,8 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) {
ds.DBName.O, ds.table.Meta().Name.O, kv.TiKV.Name(), ds.ctx.GetSessionVars().GetIsolationReadEngines())
warning := ErrInternal.GenWithStack(errMsg)
ds.ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
} else {
ds.ctx.GetSessionVars().RaiseWarningWhenMPPEnforced("Can't use mpp mode because you have set a hint to read table `" + hintTbl.tblName.O + "` from TiKV.")
}
}
if hintTbl := hintInfo.ifPreferTiFlash(alias); hintTbl != nil {
Expand Down
14 changes: 12 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,10 +973,16 @@ func getPossibleAccessPaths(ctx sessionctx.Context, tableHints *tableHintInfo, i
tablePath := &util.AccessPath{StoreType: tp}
fillContentForTablePath(tablePath, tblInfo)
publicPaths = append(publicPaths, tablePath)
if tblInfo.TiFlashReplica != nil && tblInfo.TiFlashReplica.Available {

if tblInfo.TiFlashReplica == nil {
ctx.GetSessionVars().RaiseWarningWhenMPPEnforced("Can't use mpp mode because there aren't tiflash replicas of table `" + tblInfo.Name.O + "`.")
} else if !tblInfo.TiFlashReplica.Available {
ctx.GetSessionVars().RaiseWarningWhenMPPEnforced("Can't use mpp mode because tiflash replicas of table `" + tblInfo.Name.O + "` not ready.")
} else {
publicPaths = append(publicPaths, genTiFlashPath(tblInfo, false))
publicPaths = append(publicPaths, genTiFlashPath(tblInfo, true))
}

optimizerUseInvisibleIndexes := ctx.GetSessionVars().OptimizerUseInvisibleIndexes

check = check && ctx.GetSessionVars().ConnectionID > 0
Expand Down Expand Up @@ -1110,11 +1116,15 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath,
}
}
var err error
engineVals, _ := ctx.GetSessionVars().GetSystemVar(variable.TiDBIsolationReadEngines)
if len(paths) == 0 {
engineVals, _ := ctx.GetSessionVars().GetSystemVar(variable.TiDBIsolationReadEngines)
err = ErrInternal.GenWithStackByArgs(fmt.Sprintf("Can not find access path matching '%v'(value: '%v'). Available values are '%v'.",
variable.TiDBIsolationReadEngines, engineVals, availableEngineStr))
}
if _, ok := isolationReadEngines[kv.TiFlash]; !ok {
ctx.GetSessionVars().RaiseWarningWhenMPPEnforced(
fmt.Sprintf("Can't use mpp mode because '%v'(value: '%v') not match, need 'tiflash'.", variable.TiDBIsolationReadEngines, engineVals))
}
return paths, err
}

Expand Down
40 changes: 22 additions & 18 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,9 +728,9 @@ func (p *PhysicalHashJoin) convertPartitionKeysIfNeed(lTask, rTask *mppTask) (*m
nlTask := lTask.copy().(*mppTask)
nlTask.p = lProj
nlTask = nlTask.enforceExchangerImpl(&property.PhysicalProperty{
TaskTp: property.MppTaskType,
PartitionTp: property.HashType,
PartitionCols: lPartKeys,
TaskTp: property.MppTaskType,
MPPPartitionTp: property.HashType,
MPPPartitionCols: lPartKeys,
})
nlTask.cst = lTask.cst
lProj.cost = nlTask.cst
Expand All @@ -740,9 +740,9 @@ func (p *PhysicalHashJoin) convertPartitionKeysIfNeed(lTask, rTask *mppTask) (*m
nrTask := rTask.copy().(*mppTask)
nrTask.p = rProj
nrTask = nrTask.enforceExchangerImpl(&property.PhysicalProperty{
TaskTp: property.MppTaskType,
PartitionTp: property.HashType,
PartitionCols: rPartKeys,
TaskTp: property.MppTaskType,
MPPPartitionTp: property.HashType,
MPPPartitionCols: rPartKeys,
})
nrTask.cst = rTask.cst
rProj.cost = nrTask.cst
Expand Down Expand Up @@ -1404,6 +1404,8 @@ func CheckAggCanPushCop(sctx sessionctx.Context, aggFuncs []*aggregation.AggFunc
for _, aggFunc := range aggFuncs {
// if the aggFunc contain VirtualColumn or CorrelatedColumn, it can not be pushed down.
if expression.ContainVirtualColumn(aggFunc.Args) || expression.ContainCorrelatedColumn(aggFunc.Args) {
sctx.GetSessionVars().RaiseWarningWhenMPPEnforced(
"Can't use mpp mode because expressions of aggFunc `" + aggFunc.Name + "` contain virtual column or correlated column, which is not supported now.")
return false
}
pb := aggregation.AggFuncToPBExpr(sc, client, aggFunc)
Expand All @@ -1425,6 +1427,7 @@ func CheckAggCanPushCop(sctx sessionctx.Context, aggFuncs []*aggregation.AggFunc
}
}
if expression.ContainVirtualColumn(groupByItems) {
sctx.GetSessionVars().RaiseWarningWhenMPPEnforced("Can't use mpp mode because groupByItems contain virtual column, which is not supported now.")
return false
}
return expression.CanExprsPushDown(sc, groupByItems, client, storeType)
Expand Down Expand Up @@ -1911,7 +1914,7 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task {
}
}
partialAgg.SetCost(mpp.cost())
prop := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, PartitionTp: property.HashType, PartitionCols: partitionCols}
prop := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.HashType, MPPPartitionCols: partitionCols}
newMpp := mpp.enforceExchangerImpl(prop)
if newMpp.invalid() {
return newMpp
Expand Down Expand Up @@ -2042,7 +2045,7 @@ type mppTask struct {
p PhysicalPlan
cst float64

partTp property.PartitionType
partTp property.MPPPartitionType
hashCols []*expression.Column
}

Expand Down Expand Up @@ -2091,7 +2094,7 @@ func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask {
cst := t.cst + t.count()*ctx.GetSessionVars().GetNetworkFactor(nil)
p.cost = cst / p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor
if p.ctx.GetSessionVars().IsMPPEnforced() {
p.cost = 0
p.cost = cst / 1000000000
}
rt := &rootTask{
p: p,
Expand All @@ -2101,7 +2104,7 @@ func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask {
}

func (t *mppTask) needEnforce(prop *property.PhysicalProperty) bool {
switch prop.PartitionTp {
switch prop.MPPPartitionTp {
case property.AnyType:
return false
case property.BroadcastType:
Expand All @@ -2111,10 +2114,10 @@ func (t *mppTask) needEnforce(prop *property.PhysicalProperty) bool {
return true
}
// TODO: consider equalivant class
if len(prop.PartitionCols) != len(t.hashCols) {
if len(prop.MPPPartitionCols) != len(t.hashCols) {
return true
}
for i, col := range prop.PartitionCols {
for i, col := range prop.MPPPartitionCols {
if !col.Equal(nil, t.hashCols[i]) {
return true
}
Expand All @@ -2125,6 +2128,7 @@ func (t *mppTask) needEnforce(prop *property.PhysicalProperty) bool {

func (t *mppTask) enforceExchanger(prop *property.PhysicalProperty) *mppTask {
if len(prop.SortItems) != 0 {
t.p.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("Can't use mpp mode because operator `Sort` is not supported now.")
return &mppTask{}
}
if !t.needEnforce(prop) {
Expand All @@ -2134,17 +2138,17 @@ func (t *mppTask) enforceExchanger(prop *property.PhysicalProperty) *mppTask {
}

func (t *mppTask) enforceExchangerImpl(prop *property.PhysicalProperty) *mppTask {
if collate.NewCollationEnabled() && prop.PartitionTp == property.HashType {
for _, col := range prop.PartitionCols {
if collate.NewCollationEnabled() && prop.MPPPartitionTp == property.HashType {
for _, col := range prop.MPPPartitionCols {
if types.IsString(col.RetType.Tp) {
return &mppTask{cst: math.MaxFloat64}
}
}
}
ctx := t.p.SCtx()
sender := PhysicalExchangeSender{
ExchangeType: tipb.ExchangeType(prop.PartitionTp),
HashCols: prop.PartitionCols,
ExchangeType: tipb.ExchangeType(prop.MPPPartitionTp),
HashCols: prop.MPPPartitionCols,
}.Init(ctx, t.p.statsInfo())
sender.SetChildren(t.p)
receiver := PhysicalExchangeReceiver{}.Init(ctx, t.p.statsInfo())
Expand All @@ -2155,7 +2159,7 @@ func (t *mppTask) enforceExchangerImpl(prop *property.PhysicalProperty) *mppTask
return &mppTask{
p: receiver,
cst: cst,
partTp: prop.PartitionTp,
hashCols: prop.PartitionCols,
partTp: prop.MPPPartitionTp,
hashCols: prop.MPPPartitionCols,
}
}
Loading