diff --git a/executor/builder.go b/executor/builder.go index 7736d53b5e2b0..83d6940eae789 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1980,7 +1980,7 @@ func constructDistExec(sctx sessionctx.Context, plans []plannercore.PhysicalPlan streaming := true executors := make([]*tipb.Executor, 0, len(plans)) for _, p := range plans { - execPB, err := p.ToPB(sctx) + execPB, err := p.ToPB(sctx, kv.TiKV) if err != nil { return nil, false, err } @@ -2002,7 +2002,13 @@ func markChildrenUsedCols(outputSchema *expression.Schema, childSchema ...*expre return } -func (b *executorBuilder) constructDAGReq(plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, err error) { +func constructDistExecForTiFlash(sctx sessionctx.Context, p plannercore.PhysicalPlan) ([]*tipb.Executor, bool, error) { + execPB, err := p.ToPB(sctx, kv.TiFlash) + return []*tipb.Executor{execPB}, false, err + +} + +func (b *executorBuilder) constructDAGReq(plans []plannercore.PhysicalPlan, storeType kv.StoreType) (dagReq *tipb.DAGRequest, streaming bool, err error) { dagReq = &tipb.DAGRequest{} dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(b.ctx.GetSessionVars().Location()) sc := b.ctx.GetSessionVars().StmtCtx @@ -2011,7 +2017,13 @@ func (b *executorBuilder) constructDAGReq(plans []plannercore.PhysicalPlan) (dag dagReq.CollectExecutionSummaries = &collExec } dagReq.Flags = sc.PushDownFlags() - dagReq.Executors, streaming, err = constructDistExec(b.ctx, plans) + if storeType == kv.TiFlash { + var executors []*tipb.Executor + executors, streaming, err = constructDistExecForTiFlash(b.ctx, plans[0]) + dagReq.RootExecutor = executors[0] + } else { + dagReq.Executors, streaming, err = constructDistExec(b.ctx, plans) + } distsql.SetEncodeType(b.ctx, dagReq) return dagReq, streaming, err @@ -2241,7 +2253,7 @@ func (e *TableReaderExecutor) setBatchCop(v *plannercore.PhysicalTableReader) { case 1: for _, p := range v.TablePlans { switch p.(type) { - case *plannercore.PhysicalHashAgg, *plannercore.PhysicalStreamAgg, *plannercore.PhysicalTopN: + case *plannercore.PhysicalHashAgg, *plannercore.PhysicalStreamAgg, *plannercore.PhysicalTopN, *plannercore.PhysicalBroadCastJoin: e.batchCop = true } } @@ -2252,11 +2264,15 @@ func (e *TableReaderExecutor) setBatchCop(v *plannercore.PhysicalTableReader) { } func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableReader) (*TableReaderExecutor, error) { - dagReq, streaming, err := b.constructDAGReq(v.TablePlans) + tablePlans := v.TablePlans + if v.StoreType == kv.TiFlash { + tablePlans = []plannercore.PhysicalPlan{v.GetTablePlan()} + } + dagReq, streaming, err := b.constructDAGReq(tablePlans, v.StoreType) if err != nil { return nil, err } - ts := v.TablePlans[0].(*plannercore.PhysicalTableScan) + ts := v.GetTableScan() tbl, _ := b.is.TableByID(ts.Table.ID) isPartition, physicalTableID := ts.IsPartition() if isPartition { @@ -2279,6 +2295,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea corColInFilter: b.corColInDistPlan(v.TablePlans), corColInAccess: b.corColInAccess(v.TablePlans[0]), plans: v.TablePlans, + tablePlan: v.GetTablePlan(), storeType: v.StoreType, } e.setBatchCop(v) @@ -2323,7 +2340,7 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) * return nil } - ts := v.TablePlans[0].(*plannercore.PhysicalTableScan) + ts := v.GetTableScan() ret.ranges = ts.Ranges sctx := b.ctx.GetSessionVars().StmtCtx sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID) @@ -2331,7 +2348,7 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) * } func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexReader) (*IndexReaderExecutor, error) { - dagReq, streaming, err := b.constructDAGReq(v.IndexPlans) + dagReq, streaming, err := b.constructDAGReq(v.IndexPlans, kv.TiKV) if err != nil { return nil, err } @@ -2405,7 +2422,7 @@ func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) * } func buildTableReq(b *executorBuilder, schemaLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, val table.Table, err error) { - tableReq, tableStreaming, err := b.constructDAGReq(plans) + tableReq, tableStreaming, err := b.constructDAGReq(plans, kv.TiKV) if err != nil { return nil, false, nil, err } @@ -2423,7 +2440,7 @@ func buildTableReq(b *executorBuilder, schemaLen int, plans []plannercore.Physic } func buildIndexReq(b *executorBuilder, schemaLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, err error) { - indexReq, indexStreaming, err := b.constructDAGReq(plans) + indexReq, indexStreaming, err := b.constructDAGReq(plans, kv.TiKV) if err != nil { return nil, false, err } diff --git a/executor/table_reader.go b/executor/table_reader.go index c7481bc59830e..712a17ab72f54 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -70,6 +70,7 @@ type TableReaderExecutor struct { resultHandler *tableResultHandler feedback *statistics.QueryFeedback plans []plannercore.PhysicalPlan + tablePlan plannercore.PhysicalPlan memTracker *memory.Tracker selectResultHook // for testing @@ -104,9 +105,17 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error { var err error if e.corColInFilter { - e.dagPB.Executors, _, err = constructDistExec(e.ctx, e.plans) - if err != nil { - return err + if e.storeType == kv.TiFlash { + execs, _, err := constructDistExecForTiFlash(e.ctx, e.tablePlan) + if err != nil { + return err + } + e.dagPB.RootExecutor = execs[0] + } else { + e.dagPB.Executors, _, err = constructDistExec(e.ctx, e.plans) + if err != nil { + return err + } } } if e.runtimeStats != nil { diff --git a/executor/table_readers_required_rows_test.go b/executor/table_readers_required_rows_test.go index 6cbf2b20d4fb9..ac1e8d4ba5406 100644 --- a/executor/table_readers_required_rows_test.go +++ b/executor/table_readers_required_rows_test.go @@ -128,7 +128,7 @@ func buildMockDAGRequest(sctx sessionctx.Context) *tipb.DAGRequest { Columns: []*model.ColumnInfo{}, Table: &model.TableInfo{ID: 12345, PKIsHandle: false}, Desc: false, - }}) + }}, kv.TiKV) if err != nil { panic(err) } diff --git a/go.mod b/go.mod index 8ba6b06f3cd5e..2cf750382a1fa 100644 --- a/go.mod +++ b/go.mod @@ -38,7 +38,7 @@ require ( github.com/pingcap/pd/v4 v4.0.5-0.20200817114353-e465cafe8a91 github.com/pingcap/sysutil v0.0.0-20200715082929-4c47bcac246a github.com/pingcap/tidb-tools v4.0.1-0.20200530144555-cdec43635625+incompatible - github.com/pingcap/tipb v0.0.0-20200522051215-f31a15d98fce + github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3 github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.9.1 diff --git a/go.sum b/go.sum index 1631a732a7377..412e248ed3e64 100644 --- a/go.sum +++ b/go.sum @@ -479,8 +479,9 @@ github.com/pingcap/tidb-tools v4.0.1-0.20200530144555-cdec43635625+incompatible github.com/pingcap/tidb-tools v4.0.1-0.20200530144555-cdec43635625+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= github.com/pingcap/tipb v0.0.0-20200417094153-7316d94df1ee/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= -github.com/pingcap/tipb v0.0.0-20200522051215-f31a15d98fce h1:LDyY6Xh/Z/SHVQ10erWtoOwIxHSTtlpPQ9cvS+BfRMY= github.com/pingcap/tipb v0.0.0-20200522051215-f31a15d98fce/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= +github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3 h1:ESL3eIt1kUt8IMvR1011ejZlAyDcOzw89ARvVHvpD5k= +github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index f48d7d8ffda12..7d553a4fd91dc 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -915,6 +915,8 @@ func (e *Explain) explainPlanInRowFormat(p Plan, taskType, driverSide, indent st buildSide = plan.InnerChildIdx ^ 1 case *PhysicalIndexHashJoin: buildSide = plan.InnerChildIdx ^ 1 + case *PhysicalBroadCastJoin: + buildSide = plan.InnerChildIdx } if buildSide != -1 { diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index ab91403ebd349..ae6532569a0d1 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/plancodec" "github.com/pingcap/tidb/util/ranger" @@ -39,6 +40,9 @@ import ( ) func (p *LogicalUnionScan) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool) { + if prop.IsFlashOnlyProp() { + return nil, true + } childProp := prop.Clone() us := PhysicalUnionScan{ Conditions: p.conditions, @@ -1434,11 +1438,25 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P } }) + if prop.IsFlashOnlyProp() && ((p.preferJoinType&preferBCJoin) == 0 && p.preferJoinType > 0) { + return nil, false + } + joins := make([]PhysicalPlan, 0, 8) + if p.ctx.GetSessionVars().AllowBCJ { + broadCastJoins := p.tryToGetBroadCastJoin(prop) + if (p.preferJoinType & preferBCJoin) > 0 { + return broadCastJoins, true + } + joins = append(joins, broadCastJoins...) + } + if prop.IsFlashOnlyProp() { + return joins, true + } + mergeJoins := p.GetMergeJoin(prop, p.schema, p.Stats(), p.children[0].statsInfo(), p.children[1].statsInfo()) if (p.preferJoinType&preferMergeJoin) > 0 && len(mergeJoins) > 0 { return mergeJoins, true } - joins := make([]PhysicalPlan, 0, 5) joins = append(joins, mergeJoins...) indexJoins, forced := p.tryToGetIndexJoin(prop) @@ -1462,10 +1480,72 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P return joins, true } +func (p *LogicalJoin) tryToGetBroadCastJoin(prop *property.PhysicalProperty) []PhysicalPlan { + /// todo remove this restriction after join on new collation is supported in TiFlash + if collate.NewCollationEnabled() { + return nil + } + if !prop.IsEmpty() { + return nil + } + if prop.TaskTp != property.RootTaskType && !prop.IsFlashOnlyProp() { + return nil + } + + if p.JoinType != InnerJoin || len(p.LeftConditions) != 0 || len(p.RightConditions) != 0 || len(p.OtherConditions) != 0 || len(p.EqualConditions) == 0 { + return nil + } + + if hasPrefer, idx := p.getPreferredBCJLocalIndex(); hasPrefer { + return p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 1-idx) + } + results := p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 0) + results = append(results, p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 1)...) + return results +} + +func (p *LogicalJoin) tryToGetBroadCastJoinByPreferGlobalIdx(prop *property.PhysicalProperty, preferredGlobalIndex int) []PhysicalPlan { + lkeys, rkeys := p.GetJoinKeys() + baseJoin := basePhysicalJoin{ + JoinType: p.JoinType, + LeftConditions: p.LeftConditions, + RightConditions: p.RightConditions, + DefaultValues: p.DefaultValues, + LeftJoinKeys: lkeys, + RightJoinKeys: rkeys, + } + + preferredBuildIndex := 0 + if p.children[0].statsInfo().Count() > p.children[1].statsInfo().Count() { + preferredBuildIndex = 1 + } + baseJoin.InnerChildIdx = preferredBuildIndex + childrenReqProps := make([]*property.PhysicalProperty, 2) + childrenReqProps[preferredGlobalIndex] = &property.PhysicalProperty{TaskTp: property.CopTiFlashGlobalReadTaskType, ExpectedCnt: math.MaxFloat64} + if prop.TaskTp == property.CopTiFlashGlobalReadTaskType { + childrenReqProps[1-preferredGlobalIndex] = &property.PhysicalProperty{TaskTp: property.CopTiFlashGlobalReadTaskType, ExpectedCnt: math.MaxFloat64} + } else { + childrenReqProps[1-preferredGlobalIndex] = &property.PhysicalProperty{TaskTp: property.CopTiFlashLocalReadTaskType, ExpectedCnt: math.MaxFloat64} + } + if prop.ExpectedCnt < p.stats.RowCount { + expCntScale := prop.ExpectedCnt / p.stats.RowCount + childrenReqProps[1-baseJoin.InnerChildIdx].ExpectedCnt = p.children[1-baseJoin.InnerChildIdx].statsInfo().RowCount * expCntScale + } + + join := PhysicalBroadCastJoin{ + basePhysicalJoin: baseJoin, + globalChildIndex: preferredGlobalIndex, + }.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, childrenReqProps...) + return []PhysicalPlan{join} +} + // TryToGetChildProp will check if this sort property can be pushed or not. // When a sort column will be replaced by scalar function, we refuse it. // When a sort column will be replaced by a constant, we just remove it. func (p *LogicalProjection) TryToGetChildProp(prop *property.PhysicalProperty) (*property.PhysicalProperty, bool) { + if prop.IsFlashOnlyProp() { + return nil, false + } newProp := &property.PhysicalProperty{TaskTp: property.RootTaskType, ExpectedCnt: prop.ExpectedCnt} newCols := make([]property.Item, 0, len(prop.Items)) for _, col := range prop.Items { @@ -1495,9 +1575,10 @@ func (p *LogicalProjection) exhaustPhysicalPlans(prop *property.PhysicalProperty return []PhysicalPlan{proj}, true } -func (lt *LogicalTopN) getPhysTopN() []PhysicalPlan { - ret := make([]PhysicalPlan, 0, 3) - for _, tp := range wholeTaskTypes { +func (lt *LogicalTopN) getPhysTopN(prop *property.PhysicalProperty) []PhysicalPlan { + allTaskTypes := prop.GetAllPossibleChildTaskTypes() + ret := make([]PhysicalPlan, 0, len(allTaskTypes)) + for _, tp := range allTaskTypes { resultProp := &property.PhysicalProperty{TaskTp: tp, ExpectedCnt: math.MaxFloat64} topN := PhysicalTopN{ ByItems: lt.ByItems, @@ -1509,14 +1590,15 @@ func (lt *LogicalTopN) getPhysTopN() []PhysicalPlan { return ret } -func (lt *LogicalTopN) getPhysLimits() []PhysicalPlan { - prop, canPass := GetPropByOrderByItems(lt.ByItems) +func (lt *LogicalTopN) getPhysLimits(prop *property.PhysicalProperty) []PhysicalPlan { + p, canPass := GetPropByOrderByItems(lt.ByItems) if !canPass { return nil } + allTaskTypes := prop.GetAllPossibleChildTaskTypes() ret := make([]PhysicalPlan, 0, 3) - for _, tp := range wholeTaskTypes { - resultProp := &property.PhysicalProperty{TaskTp: tp, ExpectedCnt: float64(lt.Count + lt.Offset), Items: prop.Items} + for _, tp := range allTaskTypes { + resultProp := &property.PhysicalProperty{TaskTp: tp, ExpectedCnt: float64(lt.Count + lt.Offset), Items: p.Items} limit := PhysicalLimit{ Count: lt.Count, Offset: lt.Offset, @@ -1542,7 +1624,7 @@ func MatchItems(p *property.PhysicalProperty, items []*util.ByItems) bool { func (lt *LogicalTopN) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool) { if MatchItems(prop, lt.ByItems) { - return append(lt.getPhysTopN(), lt.getPhysLimits()...), true + return append(lt.getPhysTopN(prop), lt.getPhysLimits(prop)...), true } return nil, true } @@ -1553,7 +1635,7 @@ func (la *LogicalApply) GetHashJoin(prop *property.PhysicalProperty) *PhysicalHa } func (la *LogicalApply) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool) { - if !prop.AllColsFromSchema(la.children[0].Schema()) { // for convenient, we don't pass through any prop + if !prop.AllColsFromSchema(la.children[0].Schema()) || prop.IsFlashOnlyProp() { // for convenient, we don't pass through any prop return nil, true } join := la.GetHashJoin(prop) @@ -1570,6 +1652,9 @@ func (la *LogicalApply) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([ } func (p *LogicalWindow) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool) { + if prop.IsFlashOnlyProp() { + return nil, true + } var byItems []property.Item byItems = append(byItems, p.PartitionBy...) byItems = append(byItems, p.OrderBy...) @@ -1603,8 +1688,12 @@ func (la *LogicalAggregation) canPushToCop() bool { } func (la *LogicalAggregation) getEnforcedStreamAggs(prop *property.PhysicalProperty) []PhysicalPlan { + if prop.IsFlashOnlyProp() { + return nil + } _, desc := prop.AllSameOrder() - enforcedAggs := make([]PhysicalPlan, 0, len(wholeTaskTypes)) + allTaskTypes := prop.GetAllPossibleChildTaskTypes() + enforcedAggs := make([]PhysicalPlan, 0, len(allTaskTypes)) childProp := &property.PhysicalProperty{ ExpectedCnt: math.Max(prop.ExpectedCnt*la.inputCount/la.stats.RowCount, prop.ExpectedCnt), Enforced: true, @@ -1652,6 +1741,10 @@ func (la *LogicalAggregation) distinctArgsMeetsProperty() bool { } func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []PhysicalPlan { + // TODO: support CopTiFlash task type in stream agg + if prop.IsFlashOnlyProp() { + return nil + } all, desc := prop.AllSameOrder() if !all { return nil @@ -1667,7 +1760,8 @@ func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []P return nil } - streamAggs := make([]PhysicalPlan, 0, len(la.possibleProperties)*(len(wholeTaskTypes)-1)+len(wholeTaskTypes)) + allTaskTypes := prop.GetAllPossibleChildTaskTypes() + streamAggs := make([]PhysicalPlan, 0, len(la.possibleProperties)*(len(allTaskTypes)-1)+len(allTaskTypes)) childProp := &property.PhysicalProperty{ ExpectedCnt: math.Max(prop.ExpectedCnt*la.inputCount/la.stats.RowCount, prop.ExpectedCnt), } @@ -1718,8 +1812,11 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy if !prop.IsEmpty() { return nil } - hashAggs := make([]PhysicalPlan, 0, len(wholeTaskTypes)) + hashAggs := make([]PhysicalPlan, 0, len(prop.GetAllPossibleChildTaskTypes())) taskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopDoubleReadTaskType} + if la.ctx.GetSessionVars().AllowBCJ { + taskTypes = append(taskTypes, property.CopTiFlashLocalReadTaskType) + } if la.HasDistinct() { // TODO: remove AllowDistinctAggPushDown after the cost estimation of distinct pushdown is implemented. // If AllowDistinctAggPushDown is set to true, we should not consider RootTask. @@ -1729,6 +1826,9 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy } else if !la.aggHints.preferAggToCop { taskTypes = append(taskTypes, property.RootTaskType) } + if prop.IsFlashOnlyProp() { + taskTypes = []property.TaskType{prop.TaskTp} + } for _, taskTp := range taskTypes { agg := NewPhysicalHashAgg(la, la.stats.ScaleByExpectCnt(prop.ExpectedCnt), &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64, TaskTp: taskTp}) agg.SetSchema(la.schema.Clone()) @@ -1797,8 +1897,9 @@ func (p *LogicalLimit) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([] if !prop.IsEmpty() { return nil, true } - ret := make([]PhysicalPlan, 0, len(wholeTaskTypes)) - for _, tp := range wholeTaskTypes { + allTaskTypes := prop.GetAllPossibleChildTaskTypes() + ret := make([]PhysicalPlan, 0, len(allTaskTypes)) + for _, tp := range allTaskTypes { resultProp := &property.PhysicalProperty{TaskTp: tp, ExpectedCnt: float64(p.Count + p.Offset)} limit := PhysicalLimit{ Offset: p.Offset, @@ -1810,6 +1911,9 @@ func (p *LogicalLimit) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([] } func (p *LogicalLock) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool) { + if prop.IsFlashOnlyProp() { + return nil, true + } childProp := prop.Clone() lock := PhysicalLock{ Lock: p.Lock, @@ -1821,7 +1925,7 @@ func (p *LogicalLock) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P func (p *LogicalUnionAll) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool) { // TODO: UnionAll can not pass any order, but we can change it to sort merge to keep order. - if !prop.IsEmpty() { + if !prop.IsEmpty() || prop.IsFlashOnlyProp() { return nil, true } chReqProps := make([]*property.PhysicalProperty, 0, len(p.children)) @@ -1871,7 +1975,7 @@ func (ls *LogicalSort) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([] } func (p *LogicalMaxOneRow) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool) { - if !prop.IsEmpty() { + if !prop.IsEmpty() || prop.IsFlashOnlyProp() { return nil, true } mor := PhysicalMaxOneRow{}.Init(p.ctx, p.stats, p.blockOffset, &property.PhysicalProperty{ExpectedCnt: 2}) diff --git a/planner/core/explain.go b/planner/core/explain.go index 71f5a0b3fbbd9..e0f74ee827a45 100644 --- a/planner/core/explain.go +++ b/planner/core/explain.go @@ -214,6 +214,9 @@ func (p *PhysicalTableScan) OperatorInfo(normalized bool) string { if p.stats.StatsVersion == statistics.PseudoVersion && !normalized { buffer.WriteString("stats:pseudo, ") } + if p.IsGlobalRead { + buffer.WriteString("global read, ") + } buffer.Truncate(buffer.Len() - 2) return buffer.String() } diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index 59a0bff0e93b0..a650b66b20f08 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -160,7 +160,13 @@ func (p *baseLogicalPlan) enumeratePhysicalPlans4Task(physicalPlans []PhysicalPl // combine best child tasks with parent physical plan. curTask := pp.attach2Task(childTasks...) - // enforce curTask property + if prop.IsFlashOnlyProp() { + if _, ok := curTask.(*copTask); !ok { + continue + } + } + + // Enforce curTask property if prop.Enforced { curTask = enforceProperty(prop, curTask, p.basePlan.ctx) } @@ -192,7 +198,7 @@ func (p *baseLogicalPlan) findBestTask(prop *property.PhysicalProperty) (bestTas return bestTask, nil } - if prop.TaskTp != property.RootTaskType { + if prop.TaskTp != property.RootTaskType && prop.TaskTp != property.CopTiFlashLocalReadTaskType && prop.TaskTp != property.CopTiFlashGlobalReadTaskType { // Currently all plan cannot totally push down. p.storeTask(prop, invalidTask) return invalidTask, nil @@ -402,9 +408,26 @@ func (ds *DataSource) skylinePruning(prop *property.PhysicalProperty) []*candida if len(path.Ranges) == 0 && !ds.ctx.GetSessionVars().StmtCtx.UseCache { return []*candidatePath{{path: path}} } + if path.StoreType != kv.TiFlash && (prop.TaskTp == property.CopTiFlashLocalReadTaskType || prop.TaskTp == property.CopTiFlashGlobalReadTaskType) { + continue + } var currentCandidate *candidatePath if path.IsTablePath { - currentCandidate = ds.getTableCandidate(path, prop) + if path.StoreType == kv.TiFlash { + if path.IsTiFlashGlobalRead && prop.TaskTp == property.CopTiFlashGlobalReadTaskType { + currentCandidate = ds.getTableCandidate(path, prop) + } + if !path.IsTiFlashGlobalRead && prop.TaskTp != property.CopTiFlashGlobalReadTaskType { + currentCandidate = ds.getTableCandidate(path, prop) + } + } else { + if !path.IsTiFlashGlobalRead && !prop.IsFlashOnlyProp() { + currentCandidate = ds.getTableCandidate(path, prop) + } + } + if currentCandidate == nil { + continue + } } else { coveredByIdx := isCoveringIndex(ds.schema.Columns, path.FullIdxCols, path.FullIdxColLens, ds.tableInfo.PKIsHandle) if len(path.AccessConds) > 0 || !prop.IsEmpty() || path.Forced || coveredByIdx { @@ -1172,6 +1195,9 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid copTask.keepOrder = true } ts.addPushedDownSelection(copTask, ds.stats.ScaleByExpectCnt(prop.ExpectedCnt)) + if prop.IsFlashOnlyProp() && len(copTask.rootTaskConds) != 0 { + return invalidTask, nil + } if prop.TaskTp == property.RootTaskType { task = finishCopTask(ds.ctx, task) } else if _, ok := task.(*rootTask); ok { @@ -1348,6 +1374,7 @@ func (ds *DataSource) getOriginalPhysicalTableScan(prop *property.PhysicalProper AccessCondition: path.AccessConds, filterCondition: path.TableFilters, StoreType: path.StoreType, + IsGlobalRead: path.IsTiFlashGlobalRead, }.Init(ds.ctx, ds.blockOffset) ts.SetSchema(ds.schema.Clone()) if ts.Table.PKIsHandle { @@ -1391,6 +1418,9 @@ func (ds *DataSource) getOriginalPhysicalTableScan(prop *property.PhysicalProper } sessVars := ds.ctx.GetSessionVars() cost := rowCount * rowSize * sessVars.ScanFactor + if ts.IsGlobalRead { + cost += rowCount * sessVars.NetworkFactor * rowSize + } if isMatchProp { if prop.Items[0].Desc { ts.Desc = true diff --git a/planner/core/hints.go b/planner/core/hints.go index d8d8f6ff80d92..86019b204c334 100644 --- a/planner/core/hints.go +++ b/planner/core/hints.go @@ -166,6 +166,8 @@ func genHintsFromPhysicalPlan(p PhysicalPlan, nodeType utilhint.NodeType) (res [ }) case *PhysicalMergeJoin: res = append(res, getJoinHints(p.SCtx(), HintSMJ, p.SelectBlockOffset(), nodeType, pp.children...)...) + case *PhysicalBroadCastJoin: + res = append(res, getJoinHints(p.SCtx(), HintBCJ, p.SelectBlockOffset(), nodeType, pp.children...)...) case *PhysicalHashJoin: res = append(res, getJoinHints(p.SCtx(), HintHJ, p.SelectBlockOffset(), nodeType, pp.children...)...) case *PhysicalIndexJoin: diff --git a/planner/core/initialize.go b/planner/core/initialize.go index 116478eb446f9..ff923cae70178 100644 --- a/planner/core/initialize.go +++ b/planner/core/initialize.go @@ -318,6 +318,16 @@ func (p PhysicalHashJoin) Init(ctx sessionctx.Context, stats *property.StatsInfo return &p } +// Init initializes BatchPointGetPlan. +func (p PhysicalBroadCastJoin) Init(ctx sessionctx.Context, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalBroadCastJoin { + tp := plancodec.TypeBroadcastJoin + p.basePhysicalPlan = newBasePhysicalPlan(ctx, tp, &p, offset) + p.childrenReqProps = props + p.stats = stats + return &p + +} + // Init initializes PhysicalMergeJoin. func (p PhysicalMergeJoin) Init(ctx sessionctx.Context, stats *property.StatsInfo, offset int) *PhysicalMergeJoin { p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeMergeJoin, &p, offset) @@ -467,16 +477,18 @@ func (p PointGetPlan) Init(ctx sessionctx.Context, stats *property.StatsInfo, of return &p } +func flattenTreePlan(plan PhysicalPlan, plans []PhysicalPlan) []PhysicalPlan { + plans = append(plans, plan) + for _, child := range plan.Children() { + plans = flattenTreePlan(child, plans) + } + return plans +} + // flattenPushDownPlan converts a plan tree to a list, whose head is the leaf node like table scan. func flattenPushDownPlan(p PhysicalPlan) []PhysicalPlan { plans := make([]PhysicalPlan, 0, 5) - for { - plans = append(plans, p) - if len(p.Children()) == 0 { - break - } - p = p.Children()[0] - } + plans = flattenTreePlan(p, plans) for i := 0; i < len(plans)/2; i++ { j := len(plans) - i - 1 plans[i], plans[j] = plans[j], plans[i] diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 1ded2366ec74f..1c6ce97e48aca 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -348,6 +348,73 @@ func (s *testIntegrationSerialSuite) TestSelPushDownTiFlash(c *C) { } } +func (s *testIntegrationSerialSuite) TestBroadcastJoin(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists d1_t") + tk.MustExec("create table d1_t(d1_k int, value int)") + tk.MustExec("insert into d1_t values(1,2),(2,3)") + tk.MustExec("analyze table d1_t") + tk.MustExec("drop table if exists d2_t") + tk.MustExec("create table d2_t(d2_k decimal(10,2), value int)") + tk.MustExec("insert into d2_t values(10.11,2),(10.12,3)") + tk.MustExec("analyze table d2_t") + tk.MustExec("drop table if exists d3_t") + tk.MustExec("create table d3_t(d3_k date, value int)") + tk.MustExec("insert into d3_t values(date'2010-01-01',2),(date'2010-01-02',3)") + tk.MustExec("analyze table d3_t") + tk.MustExec("drop table if exists fact_t") + tk.MustExec("create table fact_t(d1_k int, d2_k decimal(10,2), d3_k date, col1 int, col2 int, col3 int)") + tk.MustExec("insert into fact_t values(1,10.11,date'2010-01-01',1,2,3),(1,10.11,date'2010-01-02',1,2,3),(1,10.12,date'2010-01-01',1,2,3),(1,10.12,date'2010-01-02',1,2,3)") + tk.MustExec("insert into fact_t values(2,10.11,date'2010-01-01',1,2,3),(2,10.11,date'2010-01-02',1,2,3),(2,10.12,date'2010-01-01',1,2,3),(2,10.12,date'2010-01-02',1,2,3)") + tk.MustExec("analyze table fact_t") + + // Create virtual tiflash replica info. + dom := domain.GetDomain(tk.Se) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + c.Assert(exists, IsTrue) + for _, tblInfo := range db.Tables { + if tblInfo.Name.L == "fact_t" || tblInfo.Name.L == "d1_t" || tblInfo.Name.L == "d2_t" || tblInfo.Name.L == "d3_t" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + + tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'") + tk.MustExec("set @@session.tidb_allow_batch_cop = 1") + tk.MustExec("set @@session.tidb_opt_broadcast_join = 1") + var input []string + var output []struct { + SQL string + Plan []string + } + s.testData.GetTestCases(c, &input, &output) + for i, tt := range input { + s.testData.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + }) + res := tk.MustQuery(tt) + res.Check(testkit.Rows(output[i].Plan...)) + } + + // out join not supported + _, err := tk.Exec("explain select /*+ broadcast_join(fact_t, d1_t) */ count(*) from fact_t left join d1_t on fact_t.d1_k = d1_t.d1_k") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "[planner:1815]Internal : Can't find a proper physical plan for this query") + // join with non-equal condition not supported + _, err = tk.Exec("explain select /*+ broadcast_join(fact_t, d1_t) */ count(*) from fact_t join d1_t on fact_t.d1_k = d1_t.d1_k and fact_t.col1 > d1_t.value") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "[planner:1815]Internal : Can't find a proper physical plan for this query") + // cartsian join not supported + _, err = tk.Exec("explain select /*+ broadcast_join(fact_t, d1_t) */ count(*) from fact_t join d1_t") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "[planner:1815]Internal : Can't find a proper physical plan for this query") +} + func (s *testIntegrationSerialSuite) TestAggPushDownEngine(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 0b7ba20fd3ce5..262eb99f1e87d 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -59,6 +59,15 @@ const ( TiDBMergeJoin = "tidb_smj" // HintSMJ is hint enforce merge join. HintSMJ = "merge_join" + + // TiDBBroadCastJoin indicates applying broadcast join by force. + TiDBBroadCastJoin = "tidb_bcj" + + // HintBCJ indicates applying broadcast join by force. + HintBCJ = "broadcast_join" + // HintBCJPreferLocal specifies the preferred local read table + HintBCJPreferLocal = "broadcast_join_local" + // TiDBIndexNestedLoopJoin is hint enforce index nested loop join. TiDBIndexNestedLoopJoin = "tidb_inlj" // HintINLJ is hint enforce index nested loop join. @@ -460,6 +469,19 @@ func extractTableAlias(p Plan, parentOffset int) *hintTableInfo { return nil } +func (p *LogicalJoin) getPreferredBCJLocalIndex() (hasPrefer bool, prefer int) { + if p.hintInfo == nil { + return + } + if p.hintInfo.ifPreferAsLocalInBCJoin(p.children[0], p.blockOffset) { + return true, 0 + } + if p.hintInfo.ifPreferAsLocalInBCJoin(p.children[1], p.blockOffset) { + return true, 1 + } + return false, 0 +} + func (p *LogicalJoin) setPreferredJoinType(hintInfo *tableHintInfo) { if hintInfo == nil { return @@ -470,6 +492,9 @@ func (p *LogicalJoin) setPreferredJoinType(hintInfo *tableHintInfo) { if hintInfo.ifPreferMergeJoin(lhsAlias, rhsAlias) { p.preferJoinType |= preferMergeJoin } + if hintInfo.ifPreferBroadcastJoin(lhsAlias, rhsAlias) { + p.preferJoinType |= preferBCJoin + } if hintInfo.ifPreferHashJoin(lhsAlias, rhsAlias) { p.preferJoinType |= preferHashJoin } @@ -2274,11 +2299,11 @@ func (b *PlanBuilder) pushHintWithoutTableWarning(hint *ast.TableOptimizerHint) func (b *PlanBuilder) pushTableHints(hints []*ast.TableOptimizerHint, nodeType utilhint.NodeType, currentLevel int) { hints = b.hintProcessor.GetCurrentStmtHints(hints, nodeType, currentLevel) var ( - sortMergeTables, INLJTables, INLHJTables, INLMJTables, hashJoinTables []hintTableInfo - indexHintList, indexMergeHintList []indexHintInfo - tiflashTables, tikvTables []hintTableInfo - aggHints aggHintInfo - timeRangeHint ast.HintTimeRange + sortMergeTables, INLJTables, INLHJTables, INLMJTables, hashJoinTables, BCTables, BCJPreferLocalTables []hintTableInfo + indexHintList, indexMergeHintList []indexHintInfo + tiflashTables, tikvTables []hintTableInfo + aggHints aggHintInfo + timeRangeHint ast.HintTimeRange ) for _, hint := range hints { // Set warning for the hint that requires the table name. @@ -2294,6 +2319,10 @@ func (b *PlanBuilder) pushTableHints(hints []*ast.TableOptimizerHint, nodeType u switch hint.HintName.L { case TiDBMergeJoin, HintSMJ: sortMergeTables = append(sortMergeTables, tableNames2HintTableInfo(b.ctx, hint.Tables, b.hintProcessor, nodeType, currentLevel)...) + case TiDBBroadCastJoin, HintBCJ: + BCTables = append(BCTables, tableNames2HintTableInfo(b.ctx, hint.Tables, b.hintProcessor, nodeType, currentLevel)...) + case HintBCJPreferLocal: + BCJPreferLocalTables = append(BCJPreferLocalTables, tableNames2HintTableInfo(b.ctx, hint.Tables, b.hintProcessor, nodeType, currentLevel)...) case TiDBIndexNestedLoopJoin, HintINLJ: INLJTables = append(INLJTables, tableNames2HintTableInfo(b.ctx, hint.Tables, b.hintProcessor, nodeType, currentLevel)...) case HintINLHJ: @@ -2364,15 +2393,17 @@ func (b *PlanBuilder) pushTableHints(hints []*ast.TableOptimizerHint, nodeType u } } b.tableHintInfo = append(b.tableHintInfo, tableHintInfo{ - sortMergeJoinTables: sortMergeTables, - indexNestedLoopJoinTables: indexNestedLoopJoinTables{INLJTables, INLHJTables, INLMJTables}, - hashJoinTables: hashJoinTables, - indexHintList: indexHintList, - tiflashTables: tiflashTables, - tikvTables: tikvTables, - aggHints: aggHints, - indexMergeHintList: indexMergeHintList, - timeRangeHint: timeRangeHint, + sortMergeJoinTables: sortMergeTables, + broadcastJoinTables: BCTables, + broadcastJoinPreferredLocal: BCJPreferLocalTables, + indexNestedLoopJoinTables: indexNestedLoopJoinTables{INLJTables, INLHJTables, INLMJTables}, + hashJoinTables: hashJoinTables, + indexHintList: indexHintList, + tiflashTables: tiflashTables, + tikvTables: tikvTables, + aggHints: aggHints, + indexMergeHintList: indexMergeHintList, + timeRangeHint: timeRangeHint, }) } @@ -2384,6 +2415,8 @@ func (b *PlanBuilder) popTableHints() { b.appendUnmatchedJoinHintWarning(HintINLHJ, "", hintInfo.indexNestedLoopJoinTables.inlhjTables) b.appendUnmatchedJoinHintWarning(HintINLMJ, "", hintInfo.indexNestedLoopJoinTables.inlmjTables) b.appendUnmatchedJoinHintWarning(HintSMJ, TiDBMergeJoin, hintInfo.sortMergeJoinTables) + b.appendUnmatchedJoinHintWarning(HintBCJ, TiDBBroadCastJoin, hintInfo.broadcastJoinTables) + b.appendUnmatchedJoinHintWarning(HintBCJPreferLocal, "", hintInfo.broadcastJoinPreferredLocal) b.appendUnmatchedJoinHintWarning(HintHJ, TiDBHashJoin, hintInfo.hashJoinTables) b.appendUnmatchedStorageHintWarning(hintInfo.tiflashTables, hintInfo.tikvTables) b.tableHintInfo = b.tableHintInfo[:len(b.tableHintInfo)-1] diff --git a/planner/core/logical_plans.go b/planner/core/logical_plans.go index 6ff83519f7de4..b0d32974c71e8 100644 --- a/planner/core/logical_plans.go +++ b/planner/core/logical_plans.go @@ -106,6 +106,7 @@ const ( preferRightAsINLMJInner preferHashJoin preferMergeJoin + preferBCJoin preferHashAgg preferStreamAgg ) diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 3b4c7e9d99cd3..f83be56fb69b7 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -52,6 +52,7 @@ var ( _ PhysicalPlan = &PhysicalStreamAgg{} _ PhysicalPlan = &PhysicalApply{} _ PhysicalPlan = &PhysicalIndexJoin{} + _ PhysicalPlan = &PhysicalBroadCastJoin{} _ PhysicalPlan = &PhysicalHashJoin{} _ PhysicalPlan = &PhysicalMergeJoin{} _ PhysicalPlan = &PhysicalUnionScan{} @@ -73,6 +74,27 @@ type PhysicalTableReader struct { StoreType kv.StoreType } +// GetTablePlan exports the tablePlan. +func (p *PhysicalTableReader) GetTablePlan() PhysicalPlan { + return p.tablePlan +} + +// GetTableScan exports the tableScan that contained in tablePlan. +func (p *PhysicalTableReader) GetTableScan() *PhysicalTableScan { + curPlan := p.tablePlan + for { + chCnt := len(curPlan.Children()) + if chCnt == 0 { + return curPlan.(*PhysicalTableScan) + } else if chCnt == 1 { + curPlan = curPlan.Children()[0] + } else { + join := curPlan.(*PhysicalBroadCastJoin) + curPlan = join.children[1-join.globalChildIndex] + } + } +} + // GetPhysicalTableReader returns PhysicalTableReader for logical TiKVSingleGather. func (sg *TiKVSingleGather) GetPhysicalTableReader(schema *expression.Schema, stats *property.StatsInfo, props ...*property.PhysicalProperty) *PhysicalTableReader { reader := PhysicalTableReader{}.Init(sg.ctx, sg.blockOffset) @@ -246,6 +268,8 @@ type PhysicalTableScan struct { StoreType kv.StoreType + IsGlobalRead bool + // The table scan may be a partition, rather than a real table. isPartition bool // KeepOrder is true, if sort data by scanning pkcol, @@ -429,6 +453,12 @@ type PhysicalMergeJoin struct { Desc bool } +// PhysicalBroadCastJoin only works for TiFlash Engine, which broadcast the small table to every replica of probe side of tables. +type PhysicalBroadCastJoin struct { + basePhysicalJoin + globalChildIndex int +} + // PhysicalLock is the physical operator of lock, which is used for `select ... for update` clause. type PhysicalLock struct { basePhysicalPlan diff --git a/planner/core/plan.go b/planner/core/plan.go index b4a70d6dc7767..44d67c6466dcf 100644 --- a/planner/core/plan.go +++ b/planner/core/plan.go @@ -20,6 +20,7 @@ import ( "github.com/cznic/mathutil" "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/planner/property" "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" @@ -208,7 +209,7 @@ type PhysicalPlan interface { attach2Task(...task) task // ToPB converts physical plan to tipb executor. - ToPB(ctx sessionctx.Context) (*tipb.Executor, error) + ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error) // getChildReqProps gets the required property by child index. GetChildReqProps(idx int) *property.PhysicalProperty diff --git a/planner/core/plan_to_pb.go b/planner/core/plan_to_pb.go index 39537a20c2406..a81109b91c20d 100644 --- a/planner/core/plan_to_pb.go +++ b/planner/core/plan_to_pb.go @@ -16,8 +16,10 @@ package core import ( "github.com/pingcap/errors" "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" @@ -28,12 +30,12 @@ import ( ) // ToPB implements PhysicalPlan ToPB interface. -func (p *basePhysicalPlan) ToPB(_ sessionctx.Context) (*tipb.Executor, error) { +func (p *basePhysicalPlan) ToPB(_ sessionctx.Context, _ kv.StoreType) (*tipb.Executor, error) { return nil, errors.Errorf("plan %s fails converts to PB", p.basePlan.ExplainID()) } // ToPB implements PhysicalPlan ToPB interface. -func (p *PhysicalHashAgg) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) { +func (p *PhysicalHashAgg) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error) { sc := ctx.GetSessionVars().StmtCtx client := ctx.GetClient() groupByExprs, err := expression.ExpressionsToPBList(sc, p.GroupByItems, client) @@ -46,11 +48,20 @@ func (p *PhysicalHashAgg) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) { for _, aggFunc := range p.AggFuncs { aggExec.AggFunc = append(aggExec.AggFunc, aggregation.AggFuncToPBExpr(sc, client, aggFunc)) } - return &tipb.Executor{Tp: tipb.ExecType_TypeAggregation, Aggregation: aggExec}, nil + executorID := "" + if storeType == kv.TiFlash { + var err error + aggExec.Child, err = p.children[0].ToPB(ctx, storeType) + if err != nil { + return nil, errors.Trace(err) + } + executorID = p.ExplainID().String() + } + return &tipb.Executor{Tp: tipb.ExecType_TypeAggregation, Aggregation: aggExec, ExecutorId: &executorID}, nil } // ToPB implements PhysicalPlan ToPB interface. -func (p *PhysicalStreamAgg) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) { +func (p *PhysicalStreamAgg) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error) { sc := ctx.GetSessionVars().StmtCtx client := ctx.GetClient() groupByExprs, err := expression.ExpressionsToPBList(sc, p.GroupByItems, client) @@ -63,11 +74,20 @@ func (p *PhysicalStreamAgg) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) for _, aggFunc := range p.AggFuncs { aggExec.AggFunc = append(aggExec.AggFunc, aggregation.AggFuncToPBExpr(sc, client, aggFunc)) } - return &tipb.Executor{Tp: tipb.ExecType_TypeStreamAgg, Aggregation: aggExec}, nil + executorID := "" + if storeType == kv.TiFlash { + var err error + aggExec.Child, err = p.children[0].ToPB(ctx, storeType) + if err != nil { + return nil, errors.Trace(err) + } + executorID = p.ExplainID().String() + } + return &tipb.Executor{Tp: tipb.ExecType_TypeStreamAgg, Aggregation: aggExec, ExecutorId: &executorID}, nil } // ToPB implements PhysicalPlan ToPB interface. -func (p *PhysicalSelection) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) { +func (p *PhysicalSelection) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error) { sc := ctx.GetSessionVars().StmtCtx client := ctx.GetClient() conditions, err := expression.ExpressionsToPBList(sc, p.Conditions, client) @@ -77,11 +97,20 @@ func (p *PhysicalSelection) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) selExec := &tipb.Selection{ Conditions: conditions, } - return &tipb.Executor{Tp: tipb.ExecType_TypeSelection, Selection: selExec}, nil + executorID := "" + if storeType == kv.TiFlash { + var err error + selExec.Child, err = p.children[0].ToPB(ctx, storeType) + if err != nil { + return nil, errors.Trace(err) + } + executorID = p.ExplainID().String() + } + return &tipb.Executor{Tp: tipb.ExecType_TypeSelection, Selection: selExec, ExecutorId: &executorID}, nil } // ToPB implements PhysicalPlan ToPB interface. -func (p *PhysicalTopN) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) { +func (p *PhysicalTopN) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error) { sc := ctx.GetSessionVars().StmtCtx client := ctx.GetClient() topNExec := &tipb.TopN{ @@ -90,19 +119,37 @@ func (p *PhysicalTopN) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) { for _, item := range p.ByItems { topNExec.OrderBy = append(topNExec.OrderBy, expression.SortByItemToPB(sc, client, item.Expr, item.Desc)) } - return &tipb.Executor{Tp: tipb.ExecType_TypeTopN, TopN: topNExec}, nil + executorID := "" + if storeType == kv.TiFlash { + var err error + topNExec.Child, err = p.children[0].ToPB(ctx, storeType) + if err != nil { + return nil, errors.Trace(err) + } + executorID = p.ExplainID().String() + } + return &tipb.Executor{Tp: tipb.ExecType_TypeTopN, TopN: topNExec, ExecutorId: &executorID}, nil } // ToPB implements PhysicalPlan ToPB interface. -func (p *PhysicalLimit) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) { +func (p *PhysicalLimit) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error) { limitExec := &tipb.Limit{ Limit: p.Count, } - return &tipb.Executor{Tp: tipb.ExecType_TypeLimit, Limit: limitExec}, nil + executorID := "" + if storeType == kv.TiFlash { + var err error + limitExec.Child, err = p.children[0].ToPB(ctx, storeType) + if err != nil { + return nil, errors.Trace(err) + } + executorID = p.ExplainID().String() + } + return &tipb.Executor{Tp: tipb.ExecType_TypeLimit, Limit: limitExec, ExecutorId: &executorID}, nil } // ToPB implements PhysicalPlan ToPB interface. -func (p *PhysicalTableScan) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) { +func (p *PhysicalTableScan) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error) { tsExec := &tipb.TableScan{ TableId: p.Table.ID, Columns: util.ColumnsToProto(p.Columns, p.Table.PKIsHandle), @@ -111,8 +158,19 @@ func (p *PhysicalTableScan) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) if p.isPartition { tsExec.TableId = p.physicalTableID } + executorID := "" + if storeType == kv.TiFlash && p.IsGlobalRead { + tsExec.NextReadEngine = tipb.EngineType_TiFlash + ranges := distsql.TableRangesToKVRanges(tsExec.TableId, p.Ranges, nil) + for _, keyRange := range ranges { + tsExec.Ranges = append(tsExec.Ranges, tipb.KeyRange{Low: keyRange.StartKey, High: keyRange.EndKey}) + } + } + if storeType == kv.TiFlash { + executorID = p.ExplainID().String() + } err := SetPBColumnsDefaultValue(ctx, tsExec.Columns, p.Columns) - return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tsExec}, err + return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tsExec, ExecutorId: &executorID}, err } // checkCoverIndex checks whether we can pass unique info to TiKV. We should push it if and only if the length of @@ -140,7 +198,7 @@ func findColumnInfoByID(infos []*model.ColumnInfo, id int64) *model.ColumnInfo { } // ToPB implements PhysicalPlan ToPB interface. -func (p *PhysicalIndexScan) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) { +func (p *PhysicalIndexScan) ToPB(ctx sessionctx.Context, _ kv.StoreType) (*tipb.Executor, error) { columns := make([]*model.ColumnInfo, 0, p.schema.Len()) tableColumns := p.Table.Cols() for _, col := range p.schema.Columns { @@ -164,6 +222,48 @@ func (p *PhysicalIndexScan) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) return &tipb.Executor{Tp: tipb.ExecType_TypeIndexScan, IdxScan: idxExec}, nil } +// ToPB implements PhysicalPlan ToPB interface. +func (p *PhysicalBroadCastJoin) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error) { + sc := ctx.GetSessionVars().StmtCtx + client := ctx.GetClient() + leftJoinKeys := make([]expression.Expression, 0, len(p.LeftJoinKeys)) + rightJoinKeys := make([]expression.Expression, 0, len(p.RightJoinKeys)) + for _, leftKey := range p.LeftJoinKeys { + leftJoinKeys = append(leftJoinKeys, leftKey) + } + for _, rightKey := range p.RightJoinKeys { + rightJoinKeys = append(rightJoinKeys, rightKey) + } + lChildren, err := p.children[0].ToPB(ctx, storeType) + if err != nil { + return nil, errors.Trace(err) + } + rChildren, err := p.children[1].ToPB(ctx, storeType) + if err != nil { + return nil, errors.Trace(err) + } + + left, err := expression.ExpressionsToPBList(sc, leftJoinKeys, client) + if err != nil { + return nil, err + } + right, err := expression.ExpressionsToPBList(sc, rightJoinKeys, client) + if err != nil { + return nil, err + } + join := &tipb.Join{ + JoinType: tipb.JoinType_TypeInnerJoin, + JoinExecType: tipb.JoinExecType_TypeHashJoin, + InnerIdx: int64(p.InnerChildIdx), + LeftJoinKeys: left, + RightJoinKeys: right, + Children: []*tipb.Executor{lChildren, rChildren}, + } + + executorID := p.ExplainID().String() + return &tipb.Executor{Tp: tipb.ExecType_TypeJoin, Join: join, ExecutorId: &executorID}, nil +} + // SetPBColumnsDefaultValue sets the default values of tipb.ColumnInfos. func SetPBColumnsDefaultValue(ctx sessionctx.Context, pbColumns []*tipb.ColumnInfo, columns []*model.ColumnInfo) error { for i, c := range columns { diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index ea0b980f63b99..4b54be428de86 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -72,14 +72,16 @@ type indexNestedLoopJoinTables struct { type tableHintInfo struct { indexNestedLoopJoinTables - sortMergeJoinTables []hintTableInfo - hashJoinTables []hintTableInfo - indexHintList []indexHintInfo - tiflashTables []hintTableInfo - tikvTables []hintTableInfo - aggHints aggHintInfo - indexMergeHintList []indexHintInfo - timeRangeHint ast.HintTimeRange + sortMergeJoinTables []hintTableInfo + broadcastJoinTables []hintTableInfo + broadcastJoinPreferredLocal []hintTableInfo + hashJoinTables []hintTableInfo + indexHintList []indexHintInfo + tiflashTables []hintTableInfo + tikvTables []hintTableInfo + aggHints aggHintInfo + indexMergeHintList []indexHintInfo + timeRangeHint ast.HintTimeRange } type hintTableInfo struct { @@ -157,10 +159,30 @@ func tableNames2HintTableInfo(ctx sessionctx.Context, hintTables []ast.HintTable return hintTableInfos } +// ifPreferAsLocalInBCJoin checks if there is a data source specified as local read by hint +func (info *tableHintInfo) ifPreferAsLocalInBCJoin(p LogicalPlan, blockOffset int) bool { + alias := extractTableAlias(p, blockOffset) + if alias != nil { + tableNames := make([]*hintTableInfo, 1) + tableNames[0] = alias + return info.matchTableName(tableNames, info.broadcastJoinPreferredLocal) + } + for _, c := range p.Children() { + if info.ifPreferAsLocalInBCJoin(c, blockOffset) { + return true + } + } + return false +} + func (info *tableHintInfo) ifPreferMergeJoin(tableNames ...*hintTableInfo) bool { return info.matchTableName(tableNames, info.sortMergeJoinTables) } +func (info *tableHintInfo) ifPreferBroadcastJoin(tableNames ...*hintTableInfo) bool { + return info.matchTableName(tableNames, info.broadcastJoinTables) +} + func (info *tableHintInfo) ifPreferHashJoin(tableNames ...*hintTableInfo) bool { return info.matchTableName(tableNames, info.hashJoinTables) } @@ -713,6 +735,11 @@ func isPrimaryIndex(indexName model.CIStr) bool { return indexName.L == "primary" } +func genTiFlashPath(tblInfo *model.TableInfo, isGlobalRead bool) *util.AccessPath { + tiFlashPath := &util.AccessPath{IsTablePath: true, StoreType: kv.TiFlash, IsTiFlashGlobalRead: isGlobalRead} + return tiFlashPath +} + func (b *PlanBuilder) getPossibleAccessPaths(indexHints []*ast.IndexHint, tbl table.Table, dbName, tblName model.CIStr) ([]*util.AccessPath, error) { tblInfo := tbl.Meta() publicPaths := make([]*util.AccessPath, 0, len(tblInfo.Indices)+2) @@ -722,7 +749,8 @@ func (b *PlanBuilder) getPossibleAccessPaths(indexHints []*ast.IndexHint, tbl ta } publicPaths = append(publicPaths, &util.AccessPath{IsTablePath: true, StoreType: tp}) if tblInfo.TiFlashReplica != nil && tblInfo.TiFlashReplica.Available { - publicPaths = append(publicPaths, &util.AccessPath{IsTablePath: true, StoreType: kv.TiFlash}) + publicPaths = append(publicPaths, genTiFlashPath(tblInfo, false)) + publicPaths = append(publicPaths, genTiFlashPath(tblInfo, true)) } for _, index := range tblInfo.Indices { if index.State == model.StatePublic { diff --git a/planner/core/point_get_plan.go b/planner/core/point_get_plan.go index a65c8f14f6fad..1540050840908 100644 --- a/planner/core/point_get_plan.go +++ b/planner/core/point_get_plan.go @@ -85,7 +85,7 @@ func (p *PointGetPlan) attach2Task(...task) task { } // ToPB converts physical plan to tipb executor. -func (p *PointGetPlan) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) { +func (p *PointGetPlan) ToPB(ctx sessionctx.Context, _ kv.StoreType) (*tipb.Executor, error) { return nil, nil } @@ -240,7 +240,7 @@ func (p *BatchPointGetPlan) attach2Task(...task) task { } // ToPB converts physical plan to tipb executor. -func (p *BatchPointGetPlan) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) { +func (p *BatchPointGetPlan) ToPB(ctx sessionctx.Context, _ kv.StoreType) (*tipb.Executor, error) { return nil, nil } diff --git a/planner/core/resolve_indices.go b/planner/core/resolve_indices.go index f322e00dad4bc..a9c878d9bc2c6 100644 --- a/planner/core/resolve_indices.go +++ b/planner/core/resolve_indices.go @@ -113,6 +113,49 @@ func (p *PhysicalHashJoin) ResolveIndices() (err error) { return } +// ResolveIndices implements Plan interface. +func (p *PhysicalBroadCastJoin) ResolveIndices() (err error) { + err = p.physicalSchemaProducer.ResolveIndices() + if err != nil { + return err + } + lSchema := p.children[0].Schema() + rSchema := p.children[1].Schema() + for i, col := range p.LeftJoinKeys { + newKey, err := col.ResolveIndices(lSchema) + if err != nil { + return err + } + p.LeftJoinKeys[i] = newKey.(*expression.Column) + } + for i, col := range p.RightJoinKeys { + newKey, err := col.ResolveIndices(rSchema) + if err != nil { + return err + } + p.RightJoinKeys[i] = newKey.(*expression.Column) + } + for i, expr := range p.LeftConditions { + p.LeftConditions[i], err = expr.ResolveIndices(lSchema) + if err != nil { + return err + } + } + for i, expr := range p.RightConditions { + p.RightConditions[i], err = expr.ResolveIndices(rSchema) + if err != nil { + return err + } + } + for i, expr := range p.OtherConditions { + p.OtherConditions[i], err = expr.ResolveIndices(expression.MergeSchema(lSchema, rSchema)) + if err != nil { + return err + } + } + return +} + // ResolveIndices implements Plan interface. func (p *PhysicalMergeJoin) ResolveIndices() (err error) { err = p.physicalSchemaProducer.ResolveIndices() diff --git a/planner/core/task.go b/planner/core/task.go index 975314d905e0e..b307bbf8da7cb 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -151,6 +151,9 @@ func (t *copTask) getStoreType() kv.StoreType { } tp := t.tablePlan for len(tp.Children()) > 0 { + if len(tp.Children()) > 1 { + return kv.TiFlash + } tp = tp.Children()[0] } if ts, ok := tp.(*PhysicalTableScan); ok { @@ -519,10 +522,70 @@ func (p *PhysicalHashJoin) attach2Task(tasks ...task) task { lTask := finishCopTask(p.ctx, tasks[0].copy()) rTask := finishCopTask(p.ctx, tasks[1].copy()) p.SetChildren(lTask.plan(), rTask.plan()) - return &rootTask{ + task := &rootTask{ p: p, cst: lTask.cost() + rTask.cost() + p.GetCost(lTask.count(), rTask.count()), } + return task +} + +// GetCost computes cost of broadcast join operator itself. +func (p *PhysicalBroadCastJoin) GetCost(lCnt, rCnt float64) float64 { + buildCnt := lCnt + if p.InnerChildIdx == 1 { + buildCnt = rCnt + } + sessVars := p.ctx.GetSessionVars() + // Cost of building hash table. + cpuCost := buildCnt * sessVars.CopCPUFactor + memoryCost := buildCnt * sessVars.MemoryFactor + // Number of matched row pairs regarding the equal join conditions. + helper := &fullJoinRowCountHelper{ + cartesian: false, + leftProfile: p.children[0].statsInfo(), + rightProfile: p.children[1].statsInfo(), + leftJoinKeys: p.LeftJoinKeys, + rightJoinKeys: p.RightJoinKeys, + leftSchema: p.children[0].Schema(), + rightSchema: p.children[1].Schema(), + } + numPairs := helper.estimate() + probeCost := numPairs * sessVars.CopCPUFactor + // should divided by the concurrency in tiflash, which should be the number of core in tiflash nodes. + probeCost /= float64(sessVars.CopTiFlashConcurrencyFactor) + cpuCost += probeCost + + // todo since TiFlash join is significant faster than TiDB join, maybe + // need to add a variable like 'tiflash_accelerate_factor', and divide + // the final cost by that factor + return cpuCost + memoryCost +} + +func (p *PhysicalBroadCastJoin) attach2Task(tasks ...task) task { + lTask, lok := tasks[0].(*copTask) + rTask, rok := tasks[1].(*copTask) + if !lok || !rok || (lTask.getStoreType() != kv.TiFlash && rTask.getStoreType() != kv.TiFlash) { + return invalidTask + } + p.SetChildren(lTask.plan(), rTask.plan()) + p.schema = BuildPhysicalJoinSchema(p.JoinType, p) + if !lTask.indexPlanFinished { + lTask.finishIndexPlan() + } + if !rTask.indexPlanFinished { + rTask.finishIndexPlan() + } + + lCost := lTask.cost() + rCost := rTask.cost() + + task := &copTask{ + tblColHists: rTask.tblColHists, + indexPlanFinished: true, + tablePlan: p, + cst: lCost + rCost + p.GetCost(lTask.count(), rTask.count()), + } + return task } // GetCost computes cost of merge join operator itself. @@ -675,7 +738,12 @@ func finishCopTask(ctx sessionctx.Context, task task) task { } else { tp := t.tablePlan for len(tp.Children()) > 0 { - tp = tp.Children()[0] + if len(tp.Children()) == 1 { + tp = tp.Children()[0] + } else { + join := tp.(*PhysicalBroadCastJoin) + tp = join.children[1-join.InnerChildIdx] + } } ts := tp.(*PhysicalTableScan) p := PhysicalTableReader{ diff --git a/planner/core/testdata/integration_serial_suite_in.json b/planner/core/testdata/integration_serial_suite_in.json index dd62c3b372578..64707fbaadce4 100644 --- a/planner/core/testdata/integration_serial_suite_in.json +++ b/planner/core/testdata/integration_serial_suite_in.json @@ -6,6 +6,15 @@ "explain select * from t where cast(t.a as float) + 3 = 5.1" ] }, + { + "name": "TestBroadcastJoin", + "cases": [ + "explain select /*+ broadcast_join(fact_t,d1_t) */ count(*) from fact_t, d1_t where fact_t.d1_k = d1_t.d1_k", + "explain select /*+ broadcast_join(fact_t,d1_t,d2_t,d3_t) */ count(*) from fact_t, d1_t, d2_t, d3_t where fact_t.d1_k = d1_t.d1_k and fact_t.d2_k = d2_t.d2_k and fact_t.d3_k = d3_t.d3_k", + "explain select /*+ broadcast_join(fact_t,d1_t), broadcast_join_local(d1_t) */ count(*) from fact_t, d1_t where fact_t.d1_k = d1_t.d1_k", + "explain select /*+ broadcast_join(fact_t,d1_t,d2_t,d3_t), broadcast_join_local(d2_t) */ count(*) from fact_t, d1_t, d2_t, d3_t where fact_t.d1_k = d1_t.d1_k and fact_t.d2_k = d2_t.d2_k and fact_t.d3_k = d3_t.d3_k" + ] + }, { "name": "TestReadFromStorageHint", "cases": [ diff --git a/planner/core/testdata/integration_serial_suite_out.json b/planner/core/testdata/integration_serial_suite_out.json index f3ce566e30d0e..74fda0f45a033 100644 --- a/planner/core/testdata/integration_serial_suite_out.json +++ b/planner/core/testdata/integration_serial_suite_out.json @@ -20,6 +20,75 @@ } ] }, + { + "Name": "TestBroadcastJoin", + "Cases": [ + { + "SQL": "explain select /*+ broadcast_join(fact_t,d1_t) */ count(*) from fact_t, d1_t where fact_t.d1_k = d1_t.d1_k", + "Plan": [ + "StreamAgg_32 1.00 root funcs:count(Column#14)->Column#11", + "└─TableReader_33 1.00 root data:StreamAgg_13", + " └─StreamAgg_13 1.00 cop[tiflash] funcs:count(1)->Column#14", + " └─BroadcastJoin_31 8.00 cop[tiflash] ", + " ├─Selection_23(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))", + " │ └─TableFullScan_22 2.00 cop[tiflash] table:d1_t keep order:false, global read", + " └─Selection_21(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k))", + " └─TableFullScan_20 8.00 cop[tiflash] table:fact_t keep order:false" + ] + }, + { + "SQL": "explain select /*+ broadcast_join(fact_t,d1_t,d2_t,d3_t) */ count(*) from fact_t, d1_t, d2_t, d3_t where fact_t.d1_k = d1_t.d1_k and fact_t.d2_k = d2_t.d2_k and fact_t.d3_k = d3_t.d3_k", + "Plan": [ + "StreamAgg_52 1.00 root funcs:count(Column#20)->Column#17", + "└─TableReader_53 1.00 root data:StreamAgg_17", + " └─StreamAgg_17 1.00 cop[tiflash] funcs:count(1)->Column#20", + " └─BroadcastJoin_51 8.00 cop[tiflash] ", + " ├─Selection_43(Build) 2.00 cop[tiflash] not(isnull(test.d3_t.d3_k))", + " │ └─TableFullScan_42 2.00 cop[tiflash] table:d3_t keep order:false, global read", + " └─BroadcastJoin_33(Probe) 8.00 cop[tiflash] ", + " ├─Selection_29(Build) 2.00 cop[tiflash] not(isnull(test.d2_t.d2_k))", + " │ └─TableFullScan_28 2.00 cop[tiflash] table:d2_t keep order:false, global read", + " └─BroadcastJoin_37(Probe) 8.00 cop[tiflash] ", + " ├─Selection_27(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))", + " │ └─TableFullScan_26 2.00 cop[tiflash] table:d1_t keep order:false, global read", + " └─Selection_41(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k)), not(isnull(test.fact_t.d2_k)), not(isnull(test.fact_t.d3_k))", + " └─TableFullScan_40 8.00 cop[tiflash] table:fact_t keep order:false" + ] + }, + { + "SQL": "explain select /*+ broadcast_join(fact_t,d1_t), broadcast_join_local(d1_t) */ count(*) from fact_t, d1_t where fact_t.d1_k = d1_t.d1_k", + "Plan": [ + "StreamAgg_25 1.00 root funcs:count(Column#14)->Column#11", + "└─TableReader_26 1.00 root data:StreamAgg_13", + " └─StreamAgg_13 1.00 cop[tiflash] funcs:count(1)->Column#14", + " └─BroadcastJoin_24 8.00 cop[tiflash] ", + " ├─Selection_18(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))", + " │ └─TableFullScan_17 2.00 cop[tiflash] table:d1_t keep order:false", + " └─Selection_16(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k))", + " └─TableFullScan_15 8.00 cop[tiflash] table:fact_t keep order:false, global read" + ] + }, + { + "SQL": "explain select /*+ broadcast_join(fact_t,d1_t,d2_t,d3_t), broadcast_join_local(d2_t) */ count(*) from fact_t, d1_t, d2_t, d3_t where fact_t.d1_k = d1_t.d1_k and fact_t.d2_k = d2_t.d2_k and fact_t.d3_k = d3_t.d3_k", + "Plan": [ + "StreamAgg_36 1.00 root funcs:count(Column#20)->Column#17", + "└─TableReader_37 1.00 root data:StreamAgg_17", + " └─StreamAgg_17 1.00 cop[tiflash] funcs:count(1)->Column#20", + " └─BroadcastJoin_35 8.00 cop[tiflash] ", + " ├─Selection_29(Build) 2.00 cop[tiflash] not(isnull(test.d3_t.d3_k))", + " │ └─TableFullScan_28 2.00 cop[tiflash] table:d3_t keep order:false, global read", + " └─BroadcastJoin_19(Probe) 8.00 cop[tiflash] ", + " ├─Selection_27(Build) 2.00 cop[tiflash] not(isnull(test.d2_t.d2_k))", + " │ └─TableFullScan_26 2.00 cop[tiflash] table:d2_t keep order:false", + " └─BroadcastJoin_20(Probe) 8.00 cop[tiflash] ", + " ├─Selection_25(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))", + " │ └─TableFullScan_24 2.00 cop[tiflash] table:d1_t keep order:false, global read", + " └─Selection_23(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k)), not(isnull(test.fact_t.d2_k)), not(isnull(test.fact_t.d3_k))", + " └─TableFullScan_22 8.00 cop[tiflash] table:fact_t keep order:false, global read" + ] + } + ] + }, { "Name": "TestReadFromStorageHint", "Cases": [ diff --git a/planner/property/physical_property.go b/planner/property/physical_property.go index 38da639c71942..1753031bd0bb5 100644 --- a/planner/property/physical_property.go +++ b/planner/property/physical_property.go @@ -20,6 +20,10 @@ import ( "github.com/pingcap/tidb/util/codec" ) +// wholeTaskTypes records all possible kinds of task that a plan can return. For Agg, TopN and Limit, we will try to get +// these tasks one by one. +var wholeTaskTypes = []TaskType{CopSingleReadTaskType, CopDoubleReadTaskType, RootTaskType} + // Item wraps the column and its order. type Item struct { Col *expression.Column @@ -83,6 +87,20 @@ func (p *PhysicalProperty) AllColsFromSchema(schema *expression.Schema) bool { return true } +// IsFlashOnlyProp return true if this physical property is only allowed to generate flash related task +func (p *PhysicalProperty) IsFlashOnlyProp() bool { + return p.TaskTp == CopTiFlashLocalReadTaskType || p.TaskTp == CopTiFlashGlobalReadTaskType +} + +// GetAllPossibleChildTaskTypes enumrates the possible types of tasks for children. +func (p *PhysicalProperty) GetAllPossibleChildTaskTypes() []TaskType { + if p.TaskTp == RootTaskType { + return wholeTaskTypes + } + // TODO: For CopSingleReadTaskType and CopDoubleReadTaskType, this function should never be called + return []TaskType{p.TaskTp} +} + // IsPrefix checks whether the order property is the prefix of another. func (p *PhysicalProperty) IsPrefix(prop *PhysicalProperty) bool { if len(p.Items) > len(prop.Items) { diff --git a/planner/property/task_type.go b/planner/property/task_type.go index 93360fc14ce9b..3f91509ca710b 100644 --- a/planner/property/task_type.go +++ b/planner/property/task_type.go @@ -27,6 +27,17 @@ const ( // CopDoubleReadTaskType stands for the a IndexLookup tasks executed in the // coprocessor layer. CopDoubleReadTaskType + + // CopTiFlashLocalReadTaskType stands for flash coprocessor that read data locally, + // and only a part of the data is read in one cop task, if the current task type is + // CopTiFlashLocalReadTaskType, all its children prop's task type is CopTiFlashLocalReadTaskType + CopTiFlashLocalReadTaskType + + // CopTiFlashGlobalReadTaskType stands for flash coprocessor that read data globally + // and all the data of given table will be read in one cop task, if the current task + // type is CopTiFlashGlobalReadTaskType, all its children prop's task type is + // CopTiFlashGlobalReadTaskType + CopTiFlashGlobalReadTaskType ) // String implements fmt.Stringer interface. @@ -38,6 +49,10 @@ func (t TaskType) String() string { return "copSingleReadTask" case CopDoubleReadTaskType: return "copDoubleReadTask" + case CopTiFlashLocalReadTaskType: + return "copTiFlashLocalReadTask" + case CopTiFlashGlobalReadTaskType: + return "copTiFlashGlobalReadTask" } return "UnknownTaskType" } diff --git a/planner/util/path.go b/planner/util/path.go index e12ee41f569b9..e95f17b3adf7c 100644 --- a/planner/util/path.go +++ b/planner/util/path.go @@ -52,6 +52,9 @@ type AccessPath struct { // IsTablePath indicates whether this path is table path. IsTablePath bool + // IsTiFlashGlobalRead indicates whether this path is a remote read path for tiflash + IsTiFlashGlobalRead bool + // Forced means this path is generated by `use/force index()`. Forced bool diff --git a/session/session.go b/session/session.go index c4eee97dcd477..492934bd67b0f 100644 --- a/session/session.go +++ b/session/session.go @@ -1944,6 +1944,7 @@ var builtinGlobalVariable = []string{ variable.TiDBEnableIndexMerge, variable.TiDBTxnMode, variable.TiDBAllowBatchCop, + variable.TiDBOptBCJ, variable.TiDBRowFormatVersion, variable.TiDBEnableStmtSummary, variable.TiDBStmtSummaryInternalQuery, diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index a9cd2dbd0b918..b10d399a2a258 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -385,6 +385,8 @@ type SessionVars struct { // AllowAggPushDown can be set to false to forbid aggregation push down. AllowAggPushDown bool + // AllowBCJ means allow broadcast join. + AllowBCJ bool // AllowDistinctAggPushDown can be set true to allow agg with distinct push down to tikv/tiflash. AllowDistinctAggPushDown bool @@ -409,6 +411,8 @@ type SessionVars struct { CPUFactor float64 // CopCPUFactor is the CPU cost of processing one expression for one row in coprocessor. CopCPUFactor float64 + // CopTiFlashConcurrencyFactor is the concurrency number of computation in tiflash coprocessor. + CopTiFlashConcurrencyFactor float64 // NetworkFactor is the network cost of transferring 1 byte data. NetworkFactor float64 // ScanFactor is the IO cost of scanning 1 byte data on TiKV and TiFlash. @@ -658,6 +662,7 @@ func NewSessionVars() *SessionVars { Status: mysql.ServerStatusAutocommit, StmtCtx: new(stmtctx.StatementContext), AllowAggPushDown: false, + AllowBCJ: false, OptimizerSelectivityLevel: DefTiDBOptimizerSelectivityLevel, RetryLimit: DefTiDBRetryLimit, DisableTxnAutoRetry: DefTiDBDisableTxnAutoRetry, @@ -667,6 +672,7 @@ func NewSessionVars() *SessionVars { CorrelationExpFactor: DefOptCorrelationExpFactor, CPUFactor: DefOptCPUFactor, CopCPUFactor: DefOptCopCPUFactor, + CopTiFlashConcurrencyFactor: DefOptTiFlashConcurrencyFactor, NetworkFactor: DefOptNetworkFactor, ScanFactor: DefOptScanFactor, DescScanFactor: DefOptDescScanFactor, @@ -1076,6 +1082,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.SkipUTF8Check = TiDBOptOn(val) case TiDBOptAggPushDown: s.AllowAggPushDown = TiDBOptOn(val) + case TiDBOptBCJ: + s.AllowBCJ = TiDBOptOn(val) case TiDBOptDistinctAggPushDown: s.AllowDistinctAggPushDown = TiDBOptOn(val) case TiDBOptWriteRowID: @@ -1090,6 +1098,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.CPUFactor = tidbOptFloat64(val, DefOptCPUFactor) case TiDBOptCopCPUFactor: s.CopCPUFactor = tidbOptFloat64(val, DefOptCopCPUFactor) + case TiDBOptTiFlashConcurrencyFactor: + s.CopTiFlashConcurrencyFactor = tidbOptFloat64(val, DefOptTiFlashConcurrencyFactor) case TiDBOptNetworkFactor: s.NetworkFactor = tidbOptFloat64(val, DefOptNetworkFactor) case TiDBOptScanFactor: diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 35a294899e247..11ece6ebfbba3 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -611,6 +611,7 @@ var defaultSysVars = []*SysVar{ /* TiDB specific variables */ {ScopeSession, TiDBSnapshot, ""}, {ScopeSession, TiDBOptAggPushDown, BoolToIntStr(DefOptAggPushDown)}, + {ScopeGlobal | ScopeSession, TiDBOptBCJ, BoolToIntStr(DefOptBCJ)}, {ScopeSession, TiDBOptDistinctAggPushDown, BoolToIntStr(config.GetGlobalConfig().Performance.DistinctAggPushDown)}, {ScopeSession, TiDBOptWriteRowID, BoolToIntStr(DefOptWriteRowID)}, {ScopeGlobal | ScopeSession, TiDBBuildStatsConcurrency, strconv.Itoa(DefBuildStatsConcurrency)}, @@ -623,6 +624,7 @@ var defaultSysVars = []*SysVar{ {ScopeGlobal | ScopeSession, TiDBOptCorrelationThreshold, strconv.FormatFloat(DefOptCorrelationThreshold, 'f', -1, 64)}, {ScopeGlobal | ScopeSession, TiDBOptCorrelationExpFactor, strconv.Itoa(DefOptCorrelationExpFactor)}, {ScopeGlobal | ScopeSession, TiDBOptCPUFactor, strconv.FormatFloat(DefOptCPUFactor, 'f', -1, 64)}, + {ScopeGlobal | ScopeSession, TiDBOptTiFlashConcurrencyFactor, strconv.FormatFloat(DefOptTiFlashConcurrencyFactor, 'f', -1, 64)}, {ScopeGlobal | ScopeSession, TiDBOptCopCPUFactor, strconv.FormatFloat(DefOptCopCPUFactor, 'f', -1, 64)}, {ScopeGlobal | ScopeSession, TiDBOptNetworkFactor, strconv.FormatFloat(DefOptNetworkFactor, 'f', -1, 64)}, {ScopeGlobal | ScopeSession, TiDBOptScanFactor, strconv.FormatFloat(DefOptScanFactor, 'f', -1, 64)}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 9649329d63877..f9126b377fddb 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -45,6 +45,7 @@ const ( // tidb_opt_agg_push_down is used to enable/disable the optimizer rule of aggregation push down. TiDBOptAggPushDown = "tidb_opt_agg_push_down" + TiDBOptBCJ = "tidb_opt_broadcast_join" // tidb_opt_distinct_agg_push_down is used to decide whether agg with distinct should be pushed to tikv/tiflash. TiDBOptDistinctAggPushDown = "tidb_opt_distinct_agg_push_down" @@ -201,6 +202,8 @@ const ( TiDBOptCPUFactor = "tidb_opt_cpu_factor" // tidb_opt_copcpu_factor is the CPU cost of processing one expression for one row in coprocessor. TiDBOptCopCPUFactor = "tidb_opt_copcpu_factor" + // tidb_opt_tiflash_concurrency_factor is concurrency number of tiflash computation. + TiDBOptTiFlashConcurrencyFactor = "tidb_opt_tiflash_concurrency_factor" // tidb_opt_network_factor is the network cost of transferring 1 byte data. TiDBOptNetworkFactor = "tidb_opt_network_factor" // tidb_opt_scan_factor is the IO cost of scanning 1 byte data on TiKV. @@ -426,11 +429,13 @@ const ( DefChecksumTableConcurrency = 4 DefSkipUTF8Check = false DefOptAggPushDown = false + DefOptBCJ = false DefOptWriteRowID = false DefOptCorrelationThreshold = 0.9 DefOptCorrelationExpFactor = 1 DefOptCPUFactor = 3.0 DefOptCopCPUFactor = 3.0 + DefOptTiFlashConcurrencyFactor = 24.0 DefOptNetworkFactor = 1.0 DefOptScanFactor = 1.5 DefOptDescScanFactor = 3.0 diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index b94f7cbe4042a..e9214009fa7af 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -435,6 +435,11 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string, scope Sc return "ON", nil } return value, ErrWrongValueForVar.GenWithStackByArgs(name, value) + case TiDBOptBCJ: + if (strings.EqualFold(value, "ON") || value == "1") && vars.AllowBatchCop == 0 { + return value, ErrWrongValueForVar.GenWithStackByArgs("Can't set Broadcast Join to 1 but tidb_allow_batch_cop is 0, please active batch cop at first.") + } + return value, nil case TiDBSkipUTF8Check, TiDBOptAggPushDown, TiDBOptDistinctAggPushDown, TiDBOptInSubqToJoinAndAgg, TiDBEnableFastAnalyze, TiDBBatchInsert, TiDBDisableTxnAutoRetry, TiDBEnableStreaming, TiDBEnableChunkRPC, @@ -547,11 +552,15 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string, scope Sc if err != nil { return value, ErrWrongTypeForVar.GenWithStackByArgs(name) } + if v == 0 && vars.AllowBCJ { + return value, ErrWrongValueForVar.GenWithStackByArgs("Can't set batch cop 0 but tidb_opt_broadcast_join is 1, please set tidb_opt_broadcast_join 0 at first") + } if v < 0 || v > 2 { return value, ErrWrongValueForVar.GenWithStackByArgs(name, value) } return value, nil case TiDBOptCPUFactor, + TiDBOptTiFlashConcurrencyFactor, TiDBOptCopCPUFactor, TiDBOptNetworkFactor, TiDBOptScanFactor, diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index 0c24b698e3c88..2444b288bd83a 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -65,6 +65,7 @@ func (s *testVarsutilSuite) TestNewSessionVars(c *C) { c.Assert(vars.IndexLookupJoinConcurrency, Equals, DefIndexLookupJoinConcurrency) c.Assert(vars.HashJoinConcurrency, Equals, DefTiDBHashJoinConcurrency) c.Assert(vars.AllowBatchCop, Equals, DefTiDBAllowBatchCop) + c.Assert(vars.AllowBCJ, Equals, DefOptBCJ) c.Assert(vars.ProjectionConcurrency, Equals, int64(DefTiDBProjectionConcurrency)) c.Assert(vars.HashAggPartialConcurrency, Equals, DefTiDBHashAggPartialConcurrency) c.Assert(vars.HashAggFinalConcurrency, Equals, DefTiDBHashAggFinalConcurrency) @@ -194,6 +195,17 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) { c.Assert(v.SQLMode, Equals, mode) } + err = SetSessionSystemVar(v, "tidb_opt_broadcast_join", types.NewStringDatum("1")) + c.Assert(err, IsNil) + err = SetSessionSystemVar(v, "tidb_allow_batch_cop", types.NewStringDatum("0")) + c.Assert(terror.ErrorEqual(err, ErrWrongValueForVar), IsTrue) + err = SetSessionSystemVar(v, "tidb_opt_broadcast_join", types.NewStringDatum("0")) + c.Assert(err, IsNil) + err = SetSessionSystemVar(v, "tidb_allow_batch_cop", types.NewStringDatum("0")) + c.Assert(err, IsNil) + err = SetSessionSystemVar(v, "tidb_opt_broadcast_join", types.NewStringDatum("1")) + c.Assert(terror.ErrorEqual(err, ErrWrongValueForVar), IsTrue) + // Combined sql_mode SetSessionSystemVar(v, "sql_mode", types.NewStringDatum("REAL_AS_FLOAT,ANSI_QUOTES")) c.Assert(v.SQLMode, Equals, mysql.ModeRealAsFloat|mysql.ModeANSIQuotes) @@ -316,6 +328,14 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) { c.Assert(val, Equals, "5.0") c.Assert(v.CopCPUFactor, Equals, 5.0) + c.Assert(v.CopTiFlashConcurrencyFactor, Equals, 24.0) + err = SetSessionSystemVar(v, TiDBOptTiFlashConcurrencyFactor, types.NewStringDatum("5.0")) + c.Assert(err, IsNil) + val, err = GetSessionSystemVar(v, TiDBOptTiFlashConcurrencyFactor) + c.Assert(err, IsNil) + c.Assert(val, Equals, "5.0") + c.Assert(v.CopCPUFactor, Equals, 5.0) + c.Assert(v.NetworkFactor, Equals, 1.0) err = SetSessionSystemVar(v, TiDBOptNetworkFactor, types.NewStringDatum("3.0")) c.Assert(err, IsNil) @@ -491,6 +511,7 @@ func (s *testVarsutilSuite) TestValidate(c *C) { {TiDBOptCorrelationThreshold, "-2", true}, {TiDBOptCPUFactor, "a", true}, {TiDBOptCPUFactor, "-2", true}, + {TiDBOptTiFlashConcurrencyFactor, "-2", true}, {TiDBOptCopCPUFactor, "a", true}, {TiDBOptCopCPUFactor, "-2", true}, {TiDBOptNetworkFactor, "a", true}, diff --git a/store/mockstore/mocktikv/cop_handler_dag.go b/store/mockstore/mocktikv/cop_handler_dag.go index 5a8d1b953c100..086c797ee093f 100644 --- a/store/mockstore/mocktikv/cop_handler_dag.go +++ b/store/mockstore/mocktikv/cop_handler_dag.go @@ -60,7 +60,7 @@ func (h *rpcHandler) handleCopDAGRequest(req *coprocessor.Request) *coprocessor. resp.RegionError = err return resp } - dagCtx, e, dagReq, err := h.buildDAGExecutor(req) + dagCtx, e, dagReq, err := h.buildDAGExecutor(req, false) if err != nil { resp.OtherError = err.Error() return resp @@ -92,7 +92,7 @@ func (h *rpcHandler) handleCopDAGRequest(req *coprocessor.Request) *coprocessor. return buildResp(selResp, execDetails, err) } -func (h *rpcHandler) buildDAGExecutor(req *coprocessor.Request) (*dagContext, executor, *tipb.DAGRequest, error) { +func (h *rpcHandler) buildDAGExecutor(req *coprocessor.Request, batchCop bool) (*dagContext, executor, *tipb.DAGRequest, error) { if len(req.Ranges) == 0 { return nil, nil, nil, errors.New("request range is null") } @@ -118,7 +118,12 @@ func (h *rpcHandler) buildDAGExecutor(req *coprocessor.Request) (*dagContext, ex startTS: req.StartTs, evalCtx: &evalContext{sc: sc}, } - e, err := h.buildDAG(ctx, dagReq.Executors) + var e executor + if batchCop { + e, err = h.buildDAGForTiFlash(ctx, dagReq.RootExecutor) + } else { + e, err = h.buildDAG(ctx, dagReq.Executors) + } if err != nil { return nil, nil, nil, errors.Trace(err) } @@ -133,7 +138,7 @@ func constructTimeZone(name string, offset int) (*time.Location, error) { } func (h *rpcHandler) handleCopStream(ctx context.Context, req *coprocessor.Request) (tikvpb.Tikv_CoprocessorStreamClient, error) { - dagCtx, e, dagReq, err := h.buildDAGExecutor(req) + dagCtx, e, dagReq, err := h.buildDAGExecutor(req, false) if err != nil { return nil, errors.Trace(err) } @@ -146,9 +151,10 @@ func (h *rpcHandler) handleCopStream(ctx context.Context, req *coprocessor.Reque }, nil } -func (h *rpcHandler) buildExec(ctx *dagContext, curr *tipb.Executor) (executor, error) { +func (h *rpcHandler) buildExec(ctx *dagContext, curr *tipb.Executor) (executor, *tipb.Executor, error) { var currExec executor var err error + var childExec *tipb.Executor switch curr.GetTp() { case tipb.ExecType_TypeTableScan: currExec, err = h.buildTableScan(ctx, curr) @@ -156,26 +162,46 @@ func (h *rpcHandler) buildExec(ctx *dagContext, curr *tipb.Executor) (executor, currExec, err = h.buildIndexScan(ctx, curr) case tipb.ExecType_TypeSelection: currExec, err = h.buildSelection(ctx, curr) + childExec = curr.Selection.Child case tipb.ExecType_TypeAggregation: currExec, err = h.buildHashAgg(ctx, curr) + childExec = curr.Aggregation.Child case tipb.ExecType_TypeStreamAgg: currExec, err = h.buildStreamAgg(ctx, curr) + childExec = curr.Aggregation.Child case tipb.ExecType_TypeTopN: currExec, err = h.buildTopN(ctx, curr) + childExec = curr.TopN.Child case tipb.ExecType_TypeLimit: currExec = &limitExec{limit: curr.Limit.GetLimit(), execDetail: new(execDetail)} + childExec = curr.Limit.Child default: // TODO: Support other types. err = errors.Errorf("this exec type %v doesn't support yet.", curr.GetTp()) } - return currExec, errors.Trace(err) + return currExec, childExec, errors.Trace(err) +} + +func (h *rpcHandler) buildDAGForTiFlash(ctx *dagContext, farther *tipb.Executor) (executor, error) { + curr, child, err := h.buildExec(ctx, farther) + if err != nil { + return nil, errors.Trace(err) + } + if child != nil { + childExec, err := h.buildDAGForTiFlash(ctx, child) + if err != nil { + return nil, errors.Trace(err) + } + curr.SetSrcExec(childExec) + } + return curr, nil } func (h *rpcHandler) buildDAG(ctx *dagContext, executors []*tipb.Executor) (executor, error) { var src executor for i := 0; i < len(executors); i++ { - curr, err := h.buildExec(ctx, executors[i]) + curr, _, err := h.buildExec(ctx, executors[i]) if err != nil { return nil, errors.Trace(err) } diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index f2983e108983d..8d0fb5ce39c98 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -689,7 +689,7 @@ func (h *rpcHandler) handleBatchCopRequest(ctx context.Context, req *coprocessor StartTs: req.StartTs, Ranges: ri.Ranges, } - _, exec, dagReq, err := h.buildDAGExecutor(&cop) + _, exec, dagReq, err := h.buildDAGExecutor(&cop, true) if err != nil { return nil, errors.Trace(err) } diff --git a/store/tikv/batch_coprocessor.go b/store/tikv/batch_coprocessor.go index a4530c2521c9d..0ee09c06f24da 100644 --- a/store/tikv/batch_coprocessor.go +++ b/store/tikv/batch_coprocessor.go @@ -64,7 +64,7 @@ func (rs *batchCopResponse) GetStartKey() kv.Key { // GetExecDetails is unavailable currently, because TiFlash has not collected exec details for batch cop. // TODO: Will fix in near future. func (rs *batchCopResponse) GetExecDetails() *execdetails.ExecDetails { - return &execdetails.ExecDetails{} + return rs.detail } // MemSize returns how many bytes of memory this response use @@ -304,7 +304,7 @@ func (b *batchCopIterator) handleTask(ctx context.Context, bo *Backoffer, task * for idx := 0; idx < len(tasks); idx++ { ret, err := b.handleTaskOnce(ctx, bo, tasks[idx]) if err != nil { - resp := &batchCopResponse{err: errors.Trace(err)} + resp := &batchCopResponse{err: errors.Trace(err), detail: new(execdetails.ExecDetails)} b.sendToRespCh(resp) break } @@ -413,9 +413,22 @@ func (b *batchCopIterator) handleBatchCopResponse(bo *Backoffer, response *copro return errors.Trace(err) } - b.sendToRespCh(&batchCopResponse{ + resp := batchCopResponse{ pbResp: response, - }) + detail: new(execdetails.ExecDetails), + } + + resp.detail.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond + resp.detail.BackoffSleep = make(map[string]time.Duration, len(bo.backoffTimes)) + resp.detail.BackoffTimes = make(map[string]int, len(bo.backoffTimes)) + for backoff := range bo.backoffTimes { + backoffName := backoff.String() + resp.detail.BackoffTimes[backoffName] = bo.backoffTimes[backoff] + resp.detail.BackoffSleep[backoffName] = time.Duration(bo.backoffSleepMS[backoff]) * time.Millisecond + } + resp.detail.CalleeAddress = task.storeAddr + + b.sendToRespCh(&resp) return } diff --git a/util/plancodec/id.go b/util/plancodec/id.go index b460bf86e70ec..25cc7b0e35a24 100644 --- a/util/plancodec/id.go +++ b/util/plancodec/id.go @@ -52,6 +52,8 @@ const ( TypeLimit = "Limit" // TypeHashJoin is the type of hash join. TypeHashJoin = "HashJoin" + // TypeBroadcastJoin is the type of broad cast join. + TypeBroadcastJoin = "BroadcastJoin" // TypeMergeJoin is the type of merge join. TypeMergeJoin = "MergeJoin" // TypeIndexJoin is the type of index look up join.