From 6301f010c9318d54815d7a8db76967b0e9381cb7 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Mon, 27 Jul 2020 12:41:36 +0800 Subject: [PATCH 1/4] cherry pick #17232 to release-4.0 Signed-off-by: ti-srebot --- distsql/select_result.go | 9 +- executor/builder.go | 39 +++-- executor/table_readers_required_rows_test.go | 2 +- go.mod | 8 + go.sum | 6 + planner/core/common_plans.go | 2 + planner/core/exhaust_physical_plans.go | 138 +++++++++++++++--- planner/core/explain.go | 3 + planner/core/find_best_task.go | 41 +++++- planner/core/hints.go | 2 + planner/core/initialize.go | 26 +++- planner/core/integration_test.go | 67 +++++++++ planner/core/logical_plan_builder.go | 65 +++++++-- planner/core/logical_plans.go | 1 + planner/core/physical_plans.go | 48 ++++++ planner/core/plan.go | 3 +- planner/core/plan_to_pb.go | 132 +++++++++++++++-- planner/core/planbuilder.go | 67 ++++++++- planner/core/point_get_plan.go | 4 +- planner/core/resolve_indices.go | 43 ++++++ planner/core/task.go | 72 ++++++++- .../testdata/integration_serial_suite_in.json | 9 ++ .../integration_serial_suite_out.json | 69 +++++++++ planner/property/physical_property.go | 18 +++ planner/property/task_type.go | 15 ++ planner/util/path.go | 9 ++ session/session.go | 1 + sessionctx/variable/session.go | 10 ++ sessionctx/variable/sysvar.go | 2 + sessionctx/variable/tidb_vars.go | 5 + sessionctx/variable/varsutil.go | 14 ++ sessionctx/variable/varsutil_test.go | 35 +++++ store/mockstore/mocktikv/cop_handler_dag.go | 40 ++++- store/mockstore/mocktikv/rpc.go | 41 ++++++ store/tikv/batch_coprocessor.go | 21 ++- util/plancodec/id.go | 2 + 36 files changed, 981 insertions(+), 88 deletions(-) diff --git a/distsql/select_result.go b/distsql/select_result.go index 693f3ef33f277..ecc3a22723998 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -250,9 +250,14 @@ func (r *selectResult) updateCopRuntimeStats(detail *execdetails.ExecDetails, re for i, detail := range r.selectResp.GetExecutionSummaries() { if detail != nil && detail.TimeProcessedNs != nil && detail.NumProducedRows != nil && detail.NumIterations != nil { - planID := r.copPlanIDs[i] + planID := "" + if detail.GetExecutorId() != "" { + planID = detail.GetExecutorId() + } else { + planID = r.copPlanIDs[i].String() + } r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl. - RecordOneCopTask(planID.String(), callee, detail) + RecordOneCopTask(planID, callee, detail) } } } diff --git a/executor/builder.go b/executor/builder.go index 63d1a2d5059eb..1e5e6abbfce8b 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1990,7 +1990,7 @@ func constructDistExec(sctx sessionctx.Context, plans []plannercore.PhysicalPlan streaming := true executors := make([]*tipb.Executor, 0, len(plans)) for _, p := range plans { - execPB, err := p.ToPB(sctx) + execPB, err := p.ToPB(sctx, kv.TiKV) if err != nil { return nil, false, err } @@ -2012,7 +2012,13 @@ func markChildrenUsedCols(outputSchema *expression.Schema, childSchema ...*expre return } -func (b *executorBuilder) constructDAGReq(plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, err error) { +func constructDistExecForTiFlash(sctx sessionctx.Context, p plannercore.PhysicalPlan) ([]*tipb.Executor, bool, error) { + execPB, err := p.ToPB(sctx, kv.TiFlash) + return []*tipb.Executor{execPB}, false, err + +} + +func (b *executorBuilder) constructDAGReq(plans []plannercore.PhysicalPlan, storeType kv.StoreType) (dagReq *tipb.DAGRequest, streaming bool, err error) { dagReq = &tipb.DAGRequest{} dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(b.ctx.GetSessionVars().Location()) sc := b.ctx.GetSessionVars().StmtCtx @@ -2021,7 +2027,13 @@ func (b *executorBuilder) constructDAGReq(plans []plannercore.PhysicalPlan) (dag dagReq.CollectExecutionSummaries = &collExec } dagReq.Flags = sc.PushDownFlags() - dagReq.Executors, streaming, err = constructDistExec(b.ctx, plans) + if storeType == kv.TiFlash { + var executors []*tipb.Executor + executors, streaming, err = constructDistExecForTiFlash(b.ctx, plans[0]) + dagReq.RootExecutor = executors[0] + } else { + dagReq.Executors, streaming, err = constructDistExec(b.ctx, plans) + } distsql.SetEncodeType(b.ctx, dagReq) return dagReq, streaming, err @@ -2251,7 +2263,7 @@ func (e *TableReaderExecutor) setBatchCop(v *plannercore.PhysicalTableReader) { case 1: for _, p := range v.TablePlans { switch p.(type) { - case *plannercore.PhysicalHashAgg, *plannercore.PhysicalStreamAgg, *plannercore.PhysicalTopN: + case *plannercore.PhysicalHashAgg, *plannercore.PhysicalStreamAgg, *plannercore.PhysicalTopN, *plannercore.PhysicalBroadCastJoin: e.batchCop = true } } @@ -2262,11 +2274,15 @@ func (e *TableReaderExecutor) setBatchCop(v *plannercore.PhysicalTableReader) { } func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableReader) (*TableReaderExecutor, error) { - dagReq, streaming, err := b.constructDAGReq(v.TablePlans) + tablePlans := v.TablePlans + if v.StoreType == kv.TiFlash { + tablePlans = []plannercore.PhysicalPlan{v.GetTablePlan()} + } + dagReq, streaming, err := b.constructDAGReq(tablePlans, v.StoreType) if err != nil { return nil, err } - ts := v.TablePlans[0].(*plannercore.PhysicalTableScan) + ts := v.GetTableScan() tbl, _ := b.is.TableByID(ts.Table.ID) isPartition, physicalTableID := ts.IsPartition() if isPartition { @@ -2333,7 +2349,7 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) * return nil } - ts := v.TablePlans[0].(*plannercore.PhysicalTableScan) + ts := v.GetTableScan() ret.ranges = ts.Ranges sctx := b.ctx.GetSessionVars().StmtCtx sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID) @@ -2341,7 +2357,7 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) * } func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexReader) (*IndexReaderExecutor, error) { - dagReq, streaming, err := b.constructDAGReq(v.IndexPlans) + dagReq, streaming, err := b.constructDAGReq(v.IndexPlans, kv.TiKV) if err != nil { return nil, err } @@ -2415,7 +2431,7 @@ func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) * } func buildTableReq(b *executorBuilder, schemaLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, val table.Table, err error) { - tableReq, tableStreaming, err := b.constructDAGReq(plans) + tableReq, tableStreaming, err := b.constructDAGReq(plans, kv.TiKV) if err != nil { return nil, false, nil, err } @@ -2432,8 +2448,13 @@ func buildTableReq(b *executorBuilder, schemaLen int, plans []plannercore.Physic return tableReq, tableStreaming, tbl, err } +<<<<<<< HEAD func buildIndexReq(b *executorBuilder, schemaLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, err error) { indexReq, indexStreaming, err := b.constructDAGReq(plans) +======= +func buildIndexReq(b *executorBuilder, schemaLen, handleLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, err error) { + indexReq, indexStreaming, err := b.constructDAGReq(plans, kv.TiKV) +>>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) if err != nil { return nil, false, err } diff --git a/executor/table_readers_required_rows_test.go b/executor/table_readers_required_rows_test.go index 9e5aab6b53fa1..095a9c5ba68a7 100644 --- a/executor/table_readers_required_rows_test.go +++ b/executor/table_readers_required_rows_test.go @@ -128,7 +128,7 @@ func buildMockDAGRequest(sctx sessionctx.Context) *tipb.DAGRequest { Columns: []*model.ColumnInfo{}, Table: &model.TableInfo{ID: 12345, PKIsHandle: false}, Desc: false, - }}) + }}, kv.TiKV) if err != nil { panic(err) } diff --git a/go.mod b/go.mod index b9509f380195e..7d29addbaf88f 100644 --- a/go.mod +++ b/go.mod @@ -32,11 +32,19 @@ require ( github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 github.com/pingcap/kvproto v0.0.0-20200706115936-1e0910aabe6c github.com/pingcap/log v0.0.0-20200511115504-543df19646ad +<<<<<<< HEAD github.com/pingcap/parser v0.0.0-20200623164729-3a18f1e5dceb github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200520083007-2c251bd8f181 github.com/pingcap/sysutil v0.0.0-20200408114249-ed3bd6f7fdb1 github.com/pingcap/tidb-tools v4.0.1-0.20200530144555-cdec43635625+incompatible github.com/pingcap/tipb v0.0.0-20200522051215-f31a15d98fce +======= + github.com/pingcap/parser v0.0.0-20200623082809-b74301ac298b + github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200714122454-1a64f969cb3c + github.com/pingcap/sysutil v0.0.0-20200715082929-4c47bcac246a + github.com/pingcap/tidb-tools v4.0.1+incompatible + github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3 +>>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.9.1 diff --git a/go.sum b/go.sum index 0a84b4b1d4d8b..61ee701293002 100644 --- a/go.sum +++ b/go.sum @@ -439,8 +439,14 @@ github.com/pingcap/tidb-tools v4.0.1-0.20200530144555-cdec43635625+incompatible github.com/pingcap/tidb-tools v4.0.1-0.20200530144555-cdec43635625+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= github.com/pingcap/tipb v0.0.0-20200417094153-7316d94df1ee/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= +<<<<<<< HEAD github.com/pingcap/tipb v0.0.0-20200522051215-f31a15d98fce h1:LDyY6Xh/Z/SHVQ10erWtoOwIxHSTtlpPQ9cvS+BfRMY= github.com/pingcap/tipb v0.0.0-20200522051215-f31a15d98fce/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= +======= +github.com/pingcap/tipb v0.0.0-20200604070248-508f03b0b342/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= +github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3 h1:ESL3eIt1kUt8IMvR1011ejZlAyDcOzw89ARvVHvpD5k= +github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= +>>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 7319fd18c216c..580897ba9b528 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -915,6 +915,8 @@ func (e *Explain) explainPlanInRowFormat(p Plan, taskType, driverSide, indent st buildSide = plan.InnerChildIdx ^ 1 case *PhysicalIndexHashJoin: buildSide = plan.InnerChildIdx ^ 1 + case *PhysicalBroadCastJoin: + buildSide = plan.InnerChildIdx } if buildSide != -1 { diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 34e3259bbb19b..4619811b7c969 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/plancodec" "github.com/pingcap/tidb/util/ranger" @@ -39,6 +40,9 @@ import ( ) func (p *LogicalUnionScan) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool) { + if prop.IsFlashOnlyProp() { + return nil, true + } childProp := prop.Clone() us := PhysicalUnionScan{ Conditions: p.conditions, @@ -1432,11 +1436,25 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P } }) + if prop.IsFlashOnlyProp() && ((p.preferJoinType&preferBCJoin) == 0 && p.preferJoinType > 0) { + return nil, false + } + joins := make([]PhysicalPlan, 0, 8) + if p.ctx.GetSessionVars().AllowBCJ { + broadCastJoins := p.tryToGetBroadCastJoin(prop) + if (p.preferJoinType & preferBCJoin) > 0 { + return broadCastJoins, true + } + joins = append(joins, broadCastJoins...) + } + if prop.IsFlashOnlyProp() { + return joins, true + } + mergeJoins := p.GetMergeJoin(prop, p.schema, p.Stats(), p.children[0].statsInfo(), p.children[1].statsInfo()) if (p.preferJoinType&preferMergeJoin) > 0 && len(mergeJoins) > 0 { return mergeJoins, true } - joins := make([]PhysicalPlan, 0, 5) joins = append(joins, mergeJoins...) indexJoins, forced := p.tryToGetIndexJoin(prop) @@ -1460,10 +1478,72 @@ func (p *LogicalJoin) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P return joins, true } +func (p *LogicalJoin) tryToGetBroadCastJoin(prop *property.PhysicalProperty) []PhysicalPlan { + /// todo remove this restriction after join on new collation is supported in TiFlash + if collate.NewCollationEnabled() { + return nil + } + if !prop.IsEmpty() { + return nil + } + if prop.TaskTp != property.RootTaskType && !prop.IsFlashOnlyProp() { + return nil + } + + if p.JoinType != InnerJoin || len(p.LeftConditions) != 0 || len(p.RightConditions) != 0 || len(p.OtherConditions) != 0 || len(p.EqualConditions) == 0 { + return nil + } + + if hasPrefer, idx := p.getPreferredBCJLocalIndex(); hasPrefer { + return p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 1-idx) + } + results := p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 0) + results = append(results, p.tryToGetBroadCastJoinByPreferGlobalIdx(prop, 1)...) + return results +} + +func (p *LogicalJoin) tryToGetBroadCastJoinByPreferGlobalIdx(prop *property.PhysicalProperty, preferredGlobalIndex int) []PhysicalPlan { + lkeys, rkeys := p.GetJoinKeys() + baseJoin := basePhysicalJoin{ + JoinType: p.JoinType, + LeftConditions: p.LeftConditions, + RightConditions: p.RightConditions, + DefaultValues: p.DefaultValues, + LeftJoinKeys: lkeys, + RightJoinKeys: rkeys, + } + + preferredBuildIndex := 0 + if p.children[0].statsInfo().Count() > p.children[1].statsInfo().Count() { + preferredBuildIndex = 1 + } + baseJoin.InnerChildIdx = preferredBuildIndex + childrenReqProps := make([]*property.PhysicalProperty, 2) + childrenReqProps[preferredGlobalIndex] = &property.PhysicalProperty{TaskTp: property.CopTiFlashGlobalReadTaskType, ExpectedCnt: math.MaxFloat64} + if prop.TaskTp == property.CopTiFlashGlobalReadTaskType { + childrenReqProps[1-preferredGlobalIndex] = &property.PhysicalProperty{TaskTp: property.CopTiFlashGlobalReadTaskType, ExpectedCnt: math.MaxFloat64} + } else { + childrenReqProps[1-preferredGlobalIndex] = &property.PhysicalProperty{TaskTp: property.CopTiFlashLocalReadTaskType, ExpectedCnt: math.MaxFloat64} + } + if prop.ExpectedCnt < p.stats.RowCount { + expCntScale := prop.ExpectedCnt / p.stats.RowCount + childrenReqProps[1-baseJoin.InnerChildIdx].ExpectedCnt = p.children[1-baseJoin.InnerChildIdx].statsInfo().RowCount * expCntScale + } + + join := PhysicalBroadCastJoin{ + basePhysicalJoin: baseJoin, + globalChildIndex: preferredGlobalIndex, + }.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, childrenReqProps...) + return []PhysicalPlan{join} +} + // TryToGetChildProp will check if this sort property can be pushed or not. // When a sort column will be replaced by scalar function, we refuse it. // When a sort column will be replaced by a constant, we just remove it. func (p *LogicalProjection) TryToGetChildProp(prop *property.PhysicalProperty) (*property.PhysicalProperty, bool) { + if prop.IsFlashOnlyProp() { + return nil, false + } newProp := &property.PhysicalProperty{TaskTp: property.RootTaskType, ExpectedCnt: prop.ExpectedCnt} newCols := make([]property.Item, 0, len(prop.Items)) for _, col := range prop.Items { @@ -1493,9 +1573,10 @@ func (p *LogicalProjection) exhaustPhysicalPlans(prop *property.PhysicalProperty return []PhysicalPlan{proj}, true } -func (lt *LogicalTopN) getPhysTopN() []PhysicalPlan { - ret := make([]PhysicalPlan, 0, 3) - for _, tp := range wholeTaskTypes { +func (lt *LogicalTopN) getPhysTopN(prop *property.PhysicalProperty) []PhysicalPlan { + allTaskTypes := prop.GetAllPossibleChildTaskTypes() + ret := make([]PhysicalPlan, 0, len(allTaskTypes)) + for _, tp := range allTaskTypes { resultProp := &property.PhysicalProperty{TaskTp: tp, ExpectedCnt: math.MaxFloat64} topN := PhysicalTopN{ ByItems: lt.ByItems, @@ -1507,14 +1588,15 @@ func (lt *LogicalTopN) getPhysTopN() []PhysicalPlan { return ret } -func (lt *LogicalTopN) getPhysLimits() []PhysicalPlan { - prop, canPass := GetPropByOrderByItems(lt.ByItems) +func (lt *LogicalTopN) getPhysLimits(prop *property.PhysicalProperty) []PhysicalPlan { + p, canPass := GetPropByOrderByItems(lt.ByItems) if !canPass { return nil } + allTaskTypes := prop.GetAllPossibleChildTaskTypes() ret := make([]PhysicalPlan, 0, 3) - for _, tp := range wholeTaskTypes { - resultProp := &property.PhysicalProperty{TaskTp: tp, ExpectedCnt: float64(lt.Count + lt.Offset), Items: prop.Items} + for _, tp := range allTaskTypes { + resultProp := &property.PhysicalProperty{TaskTp: tp, ExpectedCnt: float64(lt.Count + lt.Offset), Items: p.Items} limit := PhysicalLimit{ Count: lt.Count, Offset: lt.Offset, @@ -1540,7 +1622,7 @@ func MatchItems(p *property.PhysicalProperty, items []*util.ByItems) bool { func (lt *LogicalTopN) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool) { if MatchItems(prop, lt.ByItems) { - return append(lt.getPhysTopN(), lt.getPhysLimits()...), true + return append(lt.getPhysTopN(prop), lt.getPhysLimits(prop)...), true } return nil, true } @@ -1551,7 +1633,7 @@ func (la *LogicalApply) GetHashJoin(prop *property.PhysicalProperty) *PhysicalHa } func (la *LogicalApply) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool) { - if !prop.AllColsFromSchema(la.children[0].Schema()) { // for convenient, we don't pass through any prop + if !prop.AllColsFromSchema(la.children[0].Schema()) || prop.IsFlashOnlyProp() { // for convenient, we don't pass through any prop return nil, true } join := la.GetHashJoin(prop) @@ -1568,6 +1650,9 @@ func (la *LogicalApply) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([ } func (p *LogicalWindow) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool) { + if prop.IsFlashOnlyProp() { + return nil, true + } var byItems []property.Item byItems = append(byItems, p.PartitionBy...) byItems = append(byItems, p.OrderBy...) @@ -1601,8 +1686,12 @@ func (la *LogicalAggregation) canPushToCop() bool { } func (la *LogicalAggregation) getEnforcedStreamAggs(prop *property.PhysicalProperty) []PhysicalPlan { + if prop.IsFlashOnlyProp() { + return nil + } _, desc := prop.AllSameOrder() - enforcedAggs := make([]PhysicalPlan, 0, len(wholeTaskTypes)) + allTaskTypes := prop.GetAllPossibleChildTaskTypes() + enforcedAggs := make([]PhysicalPlan, 0, len(allTaskTypes)) childProp := &property.PhysicalProperty{ ExpectedCnt: math.Max(prop.ExpectedCnt*la.inputCount/la.stats.RowCount, prop.ExpectedCnt), Enforced: true, @@ -1650,6 +1739,10 @@ func (la *LogicalAggregation) distinctArgsMeetsProperty() bool { } func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []PhysicalPlan { + // TODO: support CopTiFlash task type in stream agg + if prop.IsFlashOnlyProp() { + return nil + } all, desc := prop.AllSameOrder() if !all { return nil @@ -1665,7 +1758,8 @@ func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []P return nil } - streamAggs := make([]PhysicalPlan, 0, len(la.possibleProperties)*(len(wholeTaskTypes)-1)+len(wholeTaskTypes)) + allTaskTypes := prop.GetAllPossibleChildTaskTypes() + streamAggs := make([]PhysicalPlan, 0, len(la.possibleProperties)*(len(allTaskTypes)-1)+len(allTaskTypes)) childProp := &property.PhysicalProperty{ ExpectedCnt: math.Max(prop.ExpectedCnt*la.inputCount/la.stats.RowCount, prop.ExpectedCnt), } @@ -1716,8 +1810,11 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy if !prop.IsEmpty() { return nil } - hashAggs := make([]PhysicalPlan, 0, len(wholeTaskTypes)) + hashAggs := make([]PhysicalPlan, 0, len(prop.GetAllPossibleChildTaskTypes())) taskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopDoubleReadTaskType} + if la.ctx.GetSessionVars().AllowBCJ { + taskTypes = append(taskTypes, property.CopTiFlashLocalReadTaskType) + } if la.HasDistinct() { // TODO: remove AllowDistinctAggPushDown after the cost estimation of distinct pushdown is implemented. // If AllowDistinctAggPushDown is set to true, we should not consider RootTask. @@ -1727,6 +1824,9 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy } else if !la.aggHints.preferAggToCop { taskTypes = append(taskTypes, property.RootTaskType) } + if prop.IsFlashOnlyProp() { + taskTypes = []property.TaskType{prop.TaskTp} + } for _, taskTp := range taskTypes { agg := NewPhysicalHashAgg(la, la.stats.ScaleByExpectCnt(prop.ExpectedCnt), &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64, TaskTp: taskTp}) agg.SetSchema(la.schema.Clone()) @@ -1795,8 +1895,9 @@ func (p *LogicalLimit) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([] if !prop.IsEmpty() { return nil, true } - ret := make([]PhysicalPlan, 0, len(wholeTaskTypes)) - for _, tp := range wholeTaskTypes { + allTaskTypes := prop.GetAllPossibleChildTaskTypes() + ret := make([]PhysicalPlan, 0, len(allTaskTypes)) + for _, tp := range allTaskTypes { resultProp := &property.PhysicalProperty{TaskTp: tp, ExpectedCnt: float64(p.Count + p.Offset)} limit := PhysicalLimit{ Offset: p.Offset, @@ -1808,6 +1909,9 @@ func (p *LogicalLimit) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([] } func (p *LogicalLock) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool) { + if prop.IsFlashOnlyProp() { + return nil, true + } childProp := prop.Clone() lock := PhysicalLock{ Lock: p.Lock, @@ -1819,7 +1923,7 @@ func (p *LogicalLock) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]P func (p *LogicalUnionAll) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool) { // TODO: UnionAll can not pass any order, but we can change it to sort merge to keep order. - if !prop.IsEmpty() { + if !prop.IsEmpty() || prop.IsFlashOnlyProp() { return nil, true } chReqProps := make([]*property.PhysicalProperty, 0, len(p.children)) @@ -1869,7 +1973,7 @@ func (ls *LogicalSort) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([] } func (p *LogicalMaxOneRow) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]PhysicalPlan, bool) { - if !prop.IsEmpty() { + if !prop.IsEmpty() || prop.IsFlashOnlyProp() { return nil, true } mor := PhysicalMaxOneRow{}.Init(p.ctx, p.stats, p.blockOffset, &property.PhysicalProperty{ExpectedCnt: 2}) diff --git a/planner/core/explain.go b/planner/core/explain.go index 71f5a0b3fbbd9..e0f74ee827a45 100644 --- a/planner/core/explain.go +++ b/planner/core/explain.go @@ -214,6 +214,9 @@ func (p *PhysicalTableScan) OperatorInfo(normalized bool) string { if p.stats.StatsVersion == statistics.PseudoVersion && !normalized { buffer.WriteString("stats:pseudo, ") } + if p.IsGlobalRead { + buffer.WriteString("global read, ") + } buffer.Truncate(buffer.Len() - 2) return buffer.String() } diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index a7fa84cb76eb6..23ae8ac04601c 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -160,7 +160,17 @@ func (p *baseLogicalPlan) enumeratePhysicalPlans4Task(physicalPlans []PhysicalPl // combine best child tasks with parent physical plan. curTask := pp.attach2Task(childTasks...) +<<<<<<< HEAD // enforce curTask property +======= + if prop.IsFlashOnlyProp() { + if _, ok := curTask.(*copTask); !ok { + continue + } + } + + // Enforce curTask property +>>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) if prop.Enforced { curTask = enforceProperty(prop, curTask, p.basePlan.ctx) } @@ -192,7 +202,7 @@ func (p *baseLogicalPlan) findBestTask(prop *property.PhysicalProperty) (bestTas return bestTask, nil } - if prop.TaskTp != property.RootTaskType { + if prop.TaskTp != property.RootTaskType && prop.TaskTp != property.CopTiFlashLocalReadTaskType && prop.TaskTp != property.CopTiFlashGlobalReadTaskType { // Currently all plan cannot totally push down. p.storeTask(prop, invalidTask) return invalidTask, nil @@ -402,9 +412,31 @@ func (ds *DataSource) skylinePruning(prop *property.PhysicalProperty) []*candida if len(path.Ranges) == 0 && !ds.ctx.GetSessionVars().StmtCtx.UseCache { return []*candidatePath{{path: path}} } + if path.StoreType != kv.TiFlash && (prop.TaskTp == property.CopTiFlashLocalReadTaskType || prop.TaskTp == property.CopTiFlashGlobalReadTaskType) { + continue + } var currentCandidate *candidatePath +<<<<<<< HEAD if path.IsTablePath { currentCandidate = ds.getTableCandidate(path, prop) +======= + if path.IsTablePath() { + if path.StoreType == kv.TiFlash { + if path.IsTiFlashGlobalRead && prop.TaskTp == property.CopTiFlashGlobalReadTaskType { + currentCandidate = ds.getTableCandidate(path, prop) + } + if !path.IsTiFlashGlobalRead && prop.TaskTp != property.CopTiFlashGlobalReadTaskType { + currentCandidate = ds.getTableCandidate(path, prop) + } + } else { + if !path.IsTiFlashGlobalRead && !prop.IsFlashOnlyProp() { + currentCandidate = ds.getTableCandidate(path, prop) + } + } + if currentCandidate == nil { + continue + } +>>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) } else { coveredByIdx := isCoveringIndex(ds.schema.Columns, path.FullIdxCols, path.FullIdxColLens, ds.tableInfo.PKIsHandle) if len(path.AccessConds) > 0 || !prop.IsEmpty() || path.Forced || coveredByIdx { @@ -1169,6 +1201,9 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid copTask.keepOrder = true } ts.addPushedDownSelection(copTask, ds.stats.ScaleByExpectCnt(prop.ExpectedCnt)) + if prop.IsFlashOnlyProp() && len(copTask.rootTaskConds) != 0 { + return invalidTask, nil + } if prop.TaskTp == property.RootTaskType { task = finishCopTask(ds.ctx, task) } else if _, ok := task.(*rootTask); ok { @@ -1345,6 +1380,7 @@ func (ds *DataSource) getOriginalPhysicalTableScan(prop *property.PhysicalProper AccessCondition: path.AccessConds, filterCondition: path.TableFilters, StoreType: path.StoreType, + IsGlobalRead: path.IsTiFlashGlobalRead, }.Init(ds.ctx, ds.blockOffset) ts.SetSchema(ds.schema.Clone()) if ts.Table.PKIsHandle { @@ -1388,6 +1424,9 @@ func (ds *DataSource) getOriginalPhysicalTableScan(prop *property.PhysicalProper } sessVars := ds.ctx.GetSessionVars() cost := rowCount * rowSize * sessVars.ScanFactor + if ts.IsGlobalRead { + cost += rowCount * sessVars.NetworkFactor * rowSize + } if isMatchProp { if prop.Items[0].Desc { ts.Desc = true diff --git a/planner/core/hints.go b/planner/core/hints.go index d8d8f6ff80d92..86019b204c334 100644 --- a/planner/core/hints.go +++ b/planner/core/hints.go @@ -166,6 +166,8 @@ func genHintsFromPhysicalPlan(p PhysicalPlan, nodeType utilhint.NodeType) (res [ }) case *PhysicalMergeJoin: res = append(res, getJoinHints(p.SCtx(), HintSMJ, p.SelectBlockOffset(), nodeType, pp.children...)...) + case *PhysicalBroadCastJoin: + res = append(res, getJoinHints(p.SCtx(), HintBCJ, p.SelectBlockOffset(), nodeType, pp.children...)...) case *PhysicalHashJoin: res = append(res, getJoinHints(p.SCtx(), HintHJ, p.SelectBlockOffset(), nodeType, pp.children...)...) case *PhysicalIndexJoin: diff --git a/planner/core/initialize.go b/planner/core/initialize.go index b7e488a4510a7..99f2fe2d7b05b 100644 --- a/planner/core/initialize.go +++ b/planner/core/initialize.go @@ -318,6 +318,16 @@ func (p PhysicalHashJoin) Init(ctx sessionctx.Context, stats *property.StatsInfo return &p } +// Init initializes BatchPointGetPlan. +func (p PhysicalBroadCastJoin) Init(ctx sessionctx.Context, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalBroadCastJoin { + tp := plancodec.TypeBroadcastJoin + p.basePhysicalPlan = newBasePhysicalPlan(ctx, tp, &p, offset) + p.childrenReqProps = props + p.stats = stats + return &p + +} + // Init initializes PhysicalMergeJoin. func (p PhysicalMergeJoin) Init(ctx sessionctx.Context, stats *property.StatsInfo, offset int) *PhysicalMergeJoin { p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeMergeJoin, &p, offset) @@ -459,16 +469,18 @@ func (p BatchPointGetPlan) Init(ctx sessionctx.Context, stats *property.StatsInf return &p } +func flattenTreePlan(plan PhysicalPlan, plans []PhysicalPlan) []PhysicalPlan { + plans = append(plans, plan) + for _, child := range plan.Children() { + plans = flattenTreePlan(child, plans) + } + return plans +} + // flattenPushDownPlan converts a plan tree to a list, whose head is the leaf node like table scan. func flattenPushDownPlan(p PhysicalPlan) []PhysicalPlan { plans := make([]PhysicalPlan, 0, 5) - for { - plans = append(plans, p) - if len(p.Children()) == 0 { - break - } - p = p.Children()[0] - } + plans = flattenTreePlan(p, plans) for i := 0; i < len(plans)/2; i++ { j := len(plans) - i - 1 plans[i], plans[j] = plans[j], plans[i] diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index de08a3b84176c..6fce298db3e46 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -344,6 +344,73 @@ func (s *testIntegrationSerialSuite) TestSelPushDownTiFlash(c *C) { } } +func (s *testIntegrationSerialSuite) TestBroadcastJoin(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists d1_t") + tk.MustExec("create table d1_t(d1_k int, value int)") + tk.MustExec("insert into d1_t values(1,2),(2,3)") + tk.MustExec("analyze table d1_t") + tk.MustExec("drop table if exists d2_t") + tk.MustExec("create table d2_t(d2_k decimal(10,2), value int)") + tk.MustExec("insert into d2_t values(10.11,2),(10.12,3)") + tk.MustExec("analyze table d2_t") + tk.MustExec("drop table if exists d3_t") + tk.MustExec("create table d3_t(d3_k date, value int)") + tk.MustExec("insert into d3_t values(date'2010-01-01',2),(date'2010-01-02',3)") + tk.MustExec("analyze table d3_t") + tk.MustExec("drop table if exists fact_t") + tk.MustExec("create table fact_t(d1_k int, d2_k decimal(10,2), d3_k date, col1 int, col2 int, col3 int)") + tk.MustExec("insert into fact_t values(1,10.11,date'2010-01-01',1,2,3),(1,10.11,date'2010-01-02',1,2,3),(1,10.12,date'2010-01-01',1,2,3),(1,10.12,date'2010-01-02',1,2,3)") + tk.MustExec("insert into fact_t values(2,10.11,date'2010-01-01',1,2,3),(2,10.11,date'2010-01-02',1,2,3),(2,10.12,date'2010-01-01',1,2,3),(2,10.12,date'2010-01-02',1,2,3)") + tk.MustExec("analyze table fact_t") + + // Create virtual tiflash replica info. + dom := domain.GetDomain(tk.Se) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + c.Assert(exists, IsTrue) + for _, tblInfo := range db.Tables { + if tblInfo.Name.L == "fact_t" || tblInfo.Name.L == "d1_t" || tblInfo.Name.L == "d2_t" || tblInfo.Name.L == "d3_t" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + + tk.MustExec("set @@session.tidb_isolation_read_engines = 'tiflash'") + tk.MustExec("set @@session.tidb_allow_batch_cop = 1") + tk.MustExec("set @@session.tidb_opt_broadcast_join = 1") + var input []string + var output []struct { + SQL string + Plan []string + } + s.testData.GetTestCases(c, &input, &output) + for i, tt := range input { + s.testData.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + }) + res := tk.MustQuery(tt) + res.Check(testkit.Rows(output[i].Plan...)) + } + + // out join not supported + _, err := tk.Exec("explain select /*+ broadcast_join(fact_t, d1_t) */ count(*) from fact_t left join d1_t on fact_t.d1_k = d1_t.d1_k") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "[planner:1815]Internal : Can't find a proper physical plan for this query") + // join with non-equal condition not supported + _, err = tk.Exec("explain select /*+ broadcast_join(fact_t, d1_t) */ count(*) from fact_t join d1_t on fact_t.d1_k = d1_t.d1_k and fact_t.col1 > d1_t.value") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "[planner:1815]Internal : Can't find a proper physical plan for this query") + // cartsian join not supported + _, err = tk.Exec("explain select /*+ broadcast_join(fact_t, d1_t) */ count(*) from fact_t join d1_t") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "[planner:1815]Internal : Can't find a proper physical plan for this query") +} + func (s *testIntegrationSerialSuite) TestAggPushDownEngine(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index df1c0fba28b2d..ea125bb910312 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -59,6 +59,15 @@ const ( TiDBMergeJoin = "tidb_smj" // HintSMJ is hint enforce merge join. HintSMJ = "merge_join" + + // TiDBBroadCastJoin indicates applying broadcast join by force. + TiDBBroadCastJoin = "tidb_bcj" + + // HintBCJ indicates applying broadcast join by force. + HintBCJ = "broadcast_join" + // HintBCJPreferLocal specifies the preferred local read table + HintBCJPreferLocal = "broadcast_join_local" + // TiDBIndexNestedLoopJoin is hint enforce index nested loop join. TiDBIndexNestedLoopJoin = "tidb_inlj" // HintINLJ is hint enforce index nested loop join. @@ -460,6 +469,19 @@ func extractTableAlias(p Plan, parentOffset int) *hintTableInfo { return nil } +func (p *LogicalJoin) getPreferredBCJLocalIndex() (hasPrefer bool, prefer int) { + if p.hintInfo == nil { + return + } + if p.hintInfo.ifPreferAsLocalInBCJoin(p.children[0], p.blockOffset) { + return true, 0 + } + if p.hintInfo.ifPreferAsLocalInBCJoin(p.children[1], p.blockOffset) { + return true, 1 + } + return false, 0 +} + func (p *LogicalJoin) setPreferredJoinType(hintInfo *tableHintInfo) { if hintInfo == nil { return @@ -470,6 +492,9 @@ func (p *LogicalJoin) setPreferredJoinType(hintInfo *tableHintInfo) { if hintInfo.ifPreferMergeJoin(lhsAlias, rhsAlias) { p.preferJoinType |= preferMergeJoin } + if hintInfo.ifPreferBroadcastJoin(lhsAlias, rhsAlias) { + p.preferJoinType |= preferBCJoin + } if hintInfo.ifPreferHashJoin(lhsAlias, rhsAlias) { p.preferJoinType |= preferHashJoin } @@ -2262,11 +2287,11 @@ func (b *PlanBuilder) pushHintWithoutTableWarning(hint *ast.TableOptimizerHint) func (b *PlanBuilder) pushTableHints(hints []*ast.TableOptimizerHint, nodeType utilhint.NodeType, currentLevel int) { hints = b.hintProcessor.GetCurrentStmtHints(hints, nodeType, currentLevel) var ( - sortMergeTables, INLJTables, INLHJTables, INLMJTables, hashJoinTables []hintTableInfo - indexHintList, indexMergeHintList []indexHintInfo - tiflashTables, tikvTables []hintTableInfo - aggHints aggHintInfo - timeRangeHint ast.HintTimeRange + sortMergeTables, INLJTables, INLHJTables, INLMJTables, hashJoinTables, BCTables, BCJPreferLocalTables []hintTableInfo + indexHintList, indexMergeHintList []indexHintInfo + tiflashTables, tikvTables []hintTableInfo + aggHints aggHintInfo + timeRangeHint ast.HintTimeRange ) for _, hint := range hints { // Set warning for the hint that requires the table name. @@ -2281,7 +2306,15 @@ func (b *PlanBuilder) pushTableHints(hints []*ast.TableOptimizerHint, nodeType u switch hint.HintName.L { case TiDBMergeJoin, HintSMJ: +<<<<<<< HEAD sortMergeTables = append(sortMergeTables, tableNames2HintTableInfo(b.ctx, hint.Tables, b.hintProcessor, nodeType, currentLevel)...) +======= + sortMergeTables = append(sortMergeTables, tableNames2HintTableInfo(b.ctx, hint.HintName.L, hint.Tables, b.hintProcessor, nodeType, currentLevel)...) + case TiDBBroadCastJoin, HintBCJ: + BCTables = append(BCTables, tableNames2HintTableInfo(b.ctx, hint.HintName.L, hint.Tables, b.hintProcessor, nodeType, currentLevel)...) + case HintBCJPreferLocal: + BCJPreferLocalTables = append(BCJPreferLocalTables, tableNames2HintTableInfo(b.ctx, hint.HintName.L, hint.Tables, b.hintProcessor, nodeType, currentLevel)...) +>>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) case TiDBIndexNestedLoopJoin, HintINLJ: INLJTables = append(INLJTables, tableNames2HintTableInfo(b.ctx, hint.Tables, b.hintProcessor, nodeType, currentLevel)...) case HintINLHJ: @@ -2352,15 +2385,17 @@ func (b *PlanBuilder) pushTableHints(hints []*ast.TableOptimizerHint, nodeType u } } b.tableHintInfo = append(b.tableHintInfo, tableHintInfo{ - sortMergeJoinTables: sortMergeTables, - indexNestedLoopJoinTables: indexNestedLoopJoinTables{INLJTables, INLHJTables, INLMJTables}, - hashJoinTables: hashJoinTables, - indexHintList: indexHintList, - tiflashTables: tiflashTables, - tikvTables: tikvTables, - aggHints: aggHints, - indexMergeHintList: indexMergeHintList, - timeRangeHint: timeRangeHint, + sortMergeJoinTables: sortMergeTables, + broadcastJoinTables: BCTables, + broadcastJoinPreferredLocal: BCJPreferLocalTables, + indexNestedLoopJoinTables: indexNestedLoopJoinTables{INLJTables, INLHJTables, INLMJTables}, + hashJoinTables: hashJoinTables, + indexHintList: indexHintList, + tiflashTables: tiflashTables, + tikvTables: tikvTables, + aggHints: aggHints, + indexMergeHintList: indexMergeHintList, + timeRangeHint: timeRangeHint, }) } @@ -2372,6 +2407,8 @@ func (b *PlanBuilder) popTableHints() { b.appendUnmatchedJoinHintWarning(HintINLHJ, "", hintInfo.indexNestedLoopJoinTables.inlhjTables) b.appendUnmatchedJoinHintWarning(HintINLMJ, "", hintInfo.indexNestedLoopJoinTables.inlmjTables) b.appendUnmatchedJoinHintWarning(HintSMJ, TiDBMergeJoin, hintInfo.sortMergeJoinTables) + b.appendUnmatchedJoinHintWarning(HintBCJ, TiDBBroadCastJoin, hintInfo.broadcastJoinTables) + b.appendUnmatchedJoinHintWarning(HintBCJPreferLocal, "", hintInfo.broadcastJoinPreferredLocal) b.appendUnmatchedJoinHintWarning(HintHJ, TiDBHashJoin, hintInfo.hashJoinTables) b.appendUnmatchedStorageHintWarning(hintInfo.tiflashTables, hintInfo.tikvTables) b.tableHintInfo = b.tableHintInfo[:len(b.tableHintInfo)-1] diff --git a/planner/core/logical_plans.go b/planner/core/logical_plans.go index 6ff83519f7de4..b0d32974c71e8 100644 --- a/planner/core/logical_plans.go +++ b/planner/core/logical_plans.go @@ -106,6 +106,7 @@ const ( preferRightAsINLMJInner preferHashJoin preferMergeJoin + preferBCJoin preferHashAgg preferStreamAgg ) diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 00697ddd628e7..357bda244d9eb 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -52,6 +52,7 @@ var ( _ PhysicalPlan = &PhysicalStreamAgg{} _ PhysicalPlan = &PhysicalApply{} _ PhysicalPlan = &PhysicalIndexJoin{} + _ PhysicalPlan = &PhysicalBroadCastJoin{} _ PhysicalPlan = &PhysicalHashJoin{} _ PhysicalPlan = &PhysicalMergeJoin{} _ PhysicalPlan = &PhysicalUnionScan{} @@ -73,6 +74,27 @@ type PhysicalTableReader struct { StoreType kv.StoreType } +// GetTablePlan exports the tablePlan. +func (p *PhysicalTableReader) GetTablePlan() PhysicalPlan { + return p.tablePlan +} + +// GetTableScan exports the tableScan that contained in tablePlan. +func (p *PhysicalTableReader) GetTableScan() *PhysicalTableScan { + curPlan := p.tablePlan + for { + chCnt := len(curPlan.Children()) + if chCnt == 0 { + return curPlan.(*PhysicalTableScan) + } else if chCnt == 1 { + curPlan = curPlan.Children()[0] + } else { + join := curPlan.(*PhysicalBroadCastJoin) + curPlan = join.children[1-join.globalChildIndex] + } + } +} + // GetPhysicalTableReader returns PhysicalTableReader for logical TiKVSingleGather. func (sg *TiKVSingleGather) GetPhysicalTableReader(schema *expression.Schema, stats *property.StatsInfo, props ...*property.PhysicalProperty) *PhysicalTableReader { reader := PhysicalTableReader{}.Init(sg.ctx, sg.blockOffset) @@ -246,6 +268,8 @@ type PhysicalTableScan struct { StoreType kv.StoreType + IsGlobalRead bool + // The table scan may be a partition, rather than a real table. isPartition bool // KeepOrder is true, if sort data by scanning pkcol, @@ -415,6 +439,30 @@ type PhysicalMergeJoin struct { Desc bool } +<<<<<<< HEAD +======= +// PhysicalBroadCastJoin only works for TiFlash Engine, which broadcast the small table to every replica of probe side of tables. +type PhysicalBroadCastJoin struct { + basePhysicalJoin + globalChildIndex int +} + +// Clone implements PhysicalPlan interface. +func (p *PhysicalMergeJoin) Clone() (PhysicalPlan, error) { + cloned := new(PhysicalMergeJoin) + base, err := p.basePhysicalJoin.cloneWithSelf(cloned) + if err != nil { + return nil, err + } + cloned.basePhysicalJoin = *base + for _, cf := range p.CompareFuncs { + cloned.CompareFuncs = append(cloned.CompareFuncs, cf) + } + cloned.Desc = p.Desc + return cloned, nil +} + +>>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) // PhysicalLock is the physical operator of lock, which is used for `select ... for update` clause. type PhysicalLock struct { basePhysicalPlan diff --git a/planner/core/plan.go b/planner/core/plan.go index b4a70d6dc7767..44d67c6466dcf 100644 --- a/planner/core/plan.go +++ b/planner/core/plan.go @@ -20,6 +20,7 @@ import ( "github.com/cznic/mathutil" "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/planner/property" "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" @@ -208,7 +209,7 @@ type PhysicalPlan interface { attach2Task(...task) task // ToPB converts physical plan to tipb executor. - ToPB(ctx sessionctx.Context) (*tipb.Executor, error) + ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error) // getChildReqProps gets the required property by child index. GetChildReqProps(idx int) *property.PhysicalProperty diff --git a/planner/core/plan_to_pb.go b/planner/core/plan_to_pb.go index 39537a20c2406..b9dba1e4e79b4 100644 --- a/planner/core/plan_to_pb.go +++ b/planner/core/plan_to_pb.go @@ -16,8 +16,10 @@ package core import ( "github.com/pingcap/errors" "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" @@ -28,12 +30,12 @@ import ( ) // ToPB implements PhysicalPlan ToPB interface. -func (p *basePhysicalPlan) ToPB(_ sessionctx.Context) (*tipb.Executor, error) { +func (p *basePhysicalPlan) ToPB(_ sessionctx.Context, _ kv.StoreType) (*tipb.Executor, error) { return nil, errors.Errorf("plan %s fails converts to PB", p.basePlan.ExplainID()) } // ToPB implements PhysicalPlan ToPB interface. -func (p *PhysicalHashAgg) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) { +func (p *PhysicalHashAgg) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error) { sc := ctx.GetSessionVars().StmtCtx client := ctx.GetClient() groupByExprs, err := expression.ExpressionsToPBList(sc, p.GroupByItems, client) @@ -46,11 +48,20 @@ func (p *PhysicalHashAgg) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) { for _, aggFunc := range p.AggFuncs { aggExec.AggFunc = append(aggExec.AggFunc, aggregation.AggFuncToPBExpr(sc, client, aggFunc)) } - return &tipb.Executor{Tp: tipb.ExecType_TypeAggregation, Aggregation: aggExec}, nil + executorID := "" + if storeType == kv.TiFlash { + var err error + aggExec.Child, err = p.children[0].ToPB(ctx, storeType) + if err != nil { + return nil, errors.Trace(err) + } + executorID = p.ExplainID().String() + } + return &tipb.Executor{Tp: tipb.ExecType_TypeAggregation, Aggregation: aggExec, ExecutorId: &executorID}, nil } // ToPB implements PhysicalPlan ToPB interface. -func (p *PhysicalStreamAgg) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) { +func (p *PhysicalStreamAgg) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error) { sc := ctx.GetSessionVars().StmtCtx client := ctx.GetClient() groupByExprs, err := expression.ExpressionsToPBList(sc, p.GroupByItems, client) @@ -63,11 +74,20 @@ func (p *PhysicalStreamAgg) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) for _, aggFunc := range p.AggFuncs { aggExec.AggFunc = append(aggExec.AggFunc, aggregation.AggFuncToPBExpr(sc, client, aggFunc)) } - return &tipb.Executor{Tp: tipb.ExecType_TypeStreamAgg, Aggregation: aggExec}, nil + executorID := "" + if storeType == kv.TiFlash { + var err error + aggExec.Child, err = p.children[0].ToPB(ctx, storeType) + if err != nil { + return nil, errors.Trace(err) + } + executorID = p.ExplainID().String() + } + return &tipb.Executor{Tp: tipb.ExecType_TypeStreamAgg, Aggregation: aggExec, ExecutorId: &executorID}, nil } // ToPB implements PhysicalPlan ToPB interface. -func (p *PhysicalSelection) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) { +func (p *PhysicalSelection) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error) { sc := ctx.GetSessionVars().StmtCtx client := ctx.GetClient() conditions, err := expression.ExpressionsToPBList(sc, p.Conditions, client) @@ -77,11 +97,20 @@ func (p *PhysicalSelection) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) selExec := &tipb.Selection{ Conditions: conditions, } - return &tipb.Executor{Tp: tipb.ExecType_TypeSelection, Selection: selExec}, nil + executorID := "" + if storeType == kv.TiFlash { + var err error + selExec.Child, err = p.children[0].ToPB(ctx, storeType) + if err != nil { + return nil, errors.Trace(err) + } + executorID = p.ExplainID().String() + } + return &tipb.Executor{Tp: tipb.ExecType_TypeSelection, Selection: selExec, ExecutorId: &executorID}, nil } // ToPB implements PhysicalPlan ToPB interface. -func (p *PhysicalTopN) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) { +func (p *PhysicalTopN) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error) { sc := ctx.GetSessionVars().StmtCtx client := ctx.GetClient() topNExec := &tipb.TopN{ @@ -90,29 +119,64 @@ func (p *PhysicalTopN) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) { for _, item := range p.ByItems { topNExec.OrderBy = append(topNExec.OrderBy, expression.SortByItemToPB(sc, client, item.Expr, item.Desc)) } - return &tipb.Executor{Tp: tipb.ExecType_TypeTopN, TopN: topNExec}, nil + executorID := "" + if storeType == kv.TiFlash { + var err error + topNExec.Child, err = p.children[0].ToPB(ctx, storeType) + if err != nil { + return nil, errors.Trace(err) + } + executorID = p.ExplainID().String() + } + return &tipb.Executor{Tp: tipb.ExecType_TypeTopN, TopN: topNExec, ExecutorId: &executorID}, nil } // ToPB implements PhysicalPlan ToPB interface. -func (p *PhysicalLimit) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) { +func (p *PhysicalLimit) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error) { limitExec := &tipb.Limit{ Limit: p.Count, } - return &tipb.Executor{Tp: tipb.ExecType_TypeLimit, Limit: limitExec}, nil + executorID := "" + if storeType == kv.TiFlash { + var err error + limitExec.Child, err = p.children[0].ToPB(ctx, storeType) + if err != nil { + return nil, errors.Trace(err) + } + executorID = p.ExplainID().String() + } + return &tipb.Executor{Tp: tipb.ExecType_TypeLimit, Limit: limitExec, ExecutorId: &executorID}, nil } // ToPB implements PhysicalPlan ToPB interface. +<<<<<<< HEAD func (p *PhysicalTableScan) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) { tsExec := &tipb.TableScan{ TableId: p.Table.ID, Columns: util.ColumnsToProto(p.Columns, p.Table.PKIsHandle), Desc: p.Desc, } +======= +func (p *PhysicalTableScan) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error) { + tsExec := tables.BuildTableScanFromInfos(p.Table, p.Columns) + tsExec.Desc = p.Desc +>>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) if p.isPartition { tsExec.TableId = p.physicalTableID } + executorID := "" + if storeType == kv.TiFlash && p.IsGlobalRead { + tsExec.NextReadEngine = tipb.EngineType_TiFlash + ranges := distsql.TableRangesToKVRanges(tsExec.TableId, p.Ranges, nil) + for _, keyRange := range ranges { + tsExec.Ranges = append(tsExec.Ranges, tipb.KeyRange{Low: keyRange.StartKey, High: keyRange.EndKey}) + } + } + if storeType == kv.TiFlash { + executorID = p.ExplainID().String() + } err := SetPBColumnsDefaultValue(ctx, tsExec.Columns, p.Columns) - return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tsExec}, err + return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tsExec, ExecutorId: &executorID}, err } // checkCoverIndex checks whether we can pass unique info to TiKV. We should push it if and only if the length of @@ -140,7 +204,7 @@ func findColumnInfoByID(infos []*model.ColumnInfo, id int64) *model.ColumnInfo { } // ToPB implements PhysicalPlan ToPB interface. -func (p *PhysicalIndexScan) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) { +func (p *PhysicalIndexScan) ToPB(ctx sessionctx.Context, _ kv.StoreType) (*tipb.Executor, error) { columns := make([]*model.ColumnInfo, 0, p.schema.Len()) tableColumns := p.Table.Cols() for _, col := range p.schema.Columns { @@ -164,6 +228,48 @@ func (p *PhysicalIndexScan) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) return &tipb.Executor{Tp: tipb.ExecType_TypeIndexScan, IdxScan: idxExec}, nil } +// ToPB implements PhysicalPlan ToPB interface. +func (p *PhysicalBroadCastJoin) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error) { + sc := ctx.GetSessionVars().StmtCtx + client := ctx.GetClient() + leftJoinKeys := make([]expression.Expression, 0, len(p.LeftJoinKeys)) + rightJoinKeys := make([]expression.Expression, 0, len(p.RightJoinKeys)) + for _, leftKey := range p.LeftJoinKeys { + leftJoinKeys = append(leftJoinKeys, leftKey) + } + for _, rightKey := range p.RightJoinKeys { + rightJoinKeys = append(rightJoinKeys, rightKey) + } + lChildren, err := p.children[0].ToPB(ctx, storeType) + if err != nil { + return nil, errors.Trace(err) + } + rChildren, err := p.children[1].ToPB(ctx, storeType) + if err != nil { + return nil, errors.Trace(err) + } + + left, err := expression.ExpressionsToPBList(sc, leftJoinKeys, client) + if err != nil { + return nil, err + } + right, err := expression.ExpressionsToPBList(sc, rightJoinKeys, client) + if err != nil { + return nil, err + } + join := &tipb.Join{ + JoinType: tipb.JoinType_TypeInnerJoin, + JoinExecType: tipb.JoinExecType_TypeHashJoin, + InnerIdx: int64(p.InnerChildIdx), + LeftJoinKeys: left, + RightJoinKeys: right, + Children: []*tipb.Executor{lChildren, rChildren}, + } + + executorID := p.ExplainID().String() + return &tipb.Executor{Tp: tipb.ExecType_TypeJoin, Join: join, ExecutorId: &executorID}, nil +} + // SetPBColumnsDefaultValue sets the default values of tipb.ColumnInfos. func SetPBColumnsDefaultValue(ctx sessionctx.Context, pbColumns []*tipb.ColumnInfo, columns []*model.ColumnInfo) error { for i, c := range columns { diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index a895f32a3ec38..2271520415b08 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -72,14 +72,16 @@ type indexNestedLoopJoinTables struct { type tableHintInfo struct { indexNestedLoopJoinTables - sortMergeJoinTables []hintTableInfo - hashJoinTables []hintTableInfo - indexHintList []indexHintInfo - tiflashTables []hintTableInfo - tikvTables []hintTableInfo - aggHints aggHintInfo - indexMergeHintList []indexHintInfo - timeRangeHint ast.HintTimeRange + sortMergeJoinTables []hintTableInfo + broadcastJoinTables []hintTableInfo + broadcastJoinPreferredLocal []hintTableInfo + hashJoinTables []hintTableInfo + indexHintList []indexHintInfo + tiflashTables []hintTableInfo + tikvTables []hintTableInfo + aggHints aggHintInfo + indexMergeHintList []indexHintInfo + timeRangeHint ast.HintTimeRange } type hintTableInfo struct { @@ -157,10 +159,30 @@ func tableNames2HintTableInfo(ctx sessionctx.Context, hintTables []ast.HintTable return hintTableInfos } +// ifPreferAsLocalInBCJoin checks if there is a data source specified as local read by hint +func (info *tableHintInfo) ifPreferAsLocalInBCJoin(p LogicalPlan, blockOffset int) bool { + alias := extractTableAlias(p, blockOffset) + if alias != nil { + tableNames := make([]*hintTableInfo, 1) + tableNames[0] = alias + return info.matchTableName(tableNames, info.broadcastJoinPreferredLocal) + } + for _, c := range p.Children() { + if info.ifPreferAsLocalInBCJoin(c, blockOffset) { + return true + } + } + return false +} + func (info *tableHintInfo) ifPreferMergeJoin(tableNames ...*hintTableInfo) bool { return info.matchTableName(tableNames, info.sortMergeJoinTables) } +func (info *tableHintInfo) ifPreferBroadcastJoin(tableNames ...*hintTableInfo) bool { + return info.matchTableName(tableNames, info.broadcastJoinTables) +} + func (info *tableHintInfo) ifPreferHashJoin(tableNames ...*hintTableInfo) bool { return info.matchTableName(tableNames, info.hashJoinTables) } @@ -713,7 +735,31 @@ func isPrimaryIndex(indexName model.CIStr) bool { return indexName.L == "primary" } +<<<<<<< HEAD func (b *PlanBuilder) getPossibleAccessPaths(indexHints []*ast.IndexHint, tbl table.Table, dbName, tblName model.CIStr) ([]*util.AccessPath, error) { +======= +func genTiFlashPath(tblInfo *model.TableInfo, isGlobalRead bool) *util.AccessPath { + tiFlashPath := &util.AccessPath{StoreType: kv.TiFlash, IsTiFlashGlobalRead: isGlobalRead} + fillContentForTablePath(tiFlashPath, tblInfo) + return tiFlashPath +} + +func fillContentForTablePath(tablePath *util.AccessPath, tblInfo *model.TableInfo) { + if tblInfo.IsCommonHandle { + tablePath.IsCommonHandlePath = true + for _, index := range tblInfo.Indices { + if index.Primary { + tablePath.Index = index + break + } + } + } else { + tablePath.IsIntHandlePath = true + } +} + +func getPossibleAccessPaths(ctx sessionctx.Context, tableHints *tableHintInfo, indexHints []*ast.IndexHint, tbl table.Table, dbName, tblName model.CIStr) ([]*util.AccessPath, error) { +>>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) tblInfo := tbl.Meta() publicPaths := make([]*util.AccessPath, 0, len(tblInfo.Indices)+2) tp := kv.TiKV @@ -722,7 +768,12 @@ func (b *PlanBuilder) getPossibleAccessPaths(indexHints []*ast.IndexHint, tbl ta } publicPaths = append(publicPaths, &util.AccessPath{IsTablePath: true, StoreType: tp}) if tblInfo.TiFlashReplica != nil && tblInfo.TiFlashReplica.Available { +<<<<<<< HEAD publicPaths = append(publicPaths, &util.AccessPath{IsTablePath: true, StoreType: kv.TiFlash}) +======= + publicPaths = append(publicPaths, genTiFlashPath(tblInfo, false)) + publicPaths = append(publicPaths, genTiFlashPath(tblInfo, true)) +>>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) } for _, index := range tblInfo.Indices { if index.State == model.StatePublic { diff --git a/planner/core/point_get_plan.go b/planner/core/point_get_plan.go index e2b5eb5197d07..d52a8b222a2e8 100644 --- a/planner/core/point_get_plan.go +++ b/planner/core/point_get_plan.go @@ -93,7 +93,7 @@ func (p *PointGetPlan) attach2Task(...task) task { } // ToPB converts physical plan to tipb executor. -func (p *PointGetPlan) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) { +func (p *PointGetPlan) ToPB(ctx sessionctx.Context, _ kv.StoreType) (*tipb.Executor, error) { return nil, nil } @@ -248,7 +248,7 @@ func (p *BatchPointGetPlan) attach2Task(...task) task { } // ToPB converts physical plan to tipb executor. -func (p *BatchPointGetPlan) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) { +func (p *BatchPointGetPlan) ToPB(ctx sessionctx.Context, _ kv.StoreType) (*tipb.Executor, error) { return nil, nil } diff --git a/planner/core/resolve_indices.go b/planner/core/resolve_indices.go index f322e00dad4bc..a9c878d9bc2c6 100644 --- a/planner/core/resolve_indices.go +++ b/planner/core/resolve_indices.go @@ -113,6 +113,49 @@ func (p *PhysicalHashJoin) ResolveIndices() (err error) { return } +// ResolveIndices implements Plan interface. +func (p *PhysicalBroadCastJoin) ResolveIndices() (err error) { + err = p.physicalSchemaProducer.ResolveIndices() + if err != nil { + return err + } + lSchema := p.children[0].Schema() + rSchema := p.children[1].Schema() + for i, col := range p.LeftJoinKeys { + newKey, err := col.ResolveIndices(lSchema) + if err != nil { + return err + } + p.LeftJoinKeys[i] = newKey.(*expression.Column) + } + for i, col := range p.RightJoinKeys { + newKey, err := col.ResolveIndices(rSchema) + if err != nil { + return err + } + p.RightJoinKeys[i] = newKey.(*expression.Column) + } + for i, expr := range p.LeftConditions { + p.LeftConditions[i], err = expr.ResolveIndices(lSchema) + if err != nil { + return err + } + } + for i, expr := range p.RightConditions { + p.RightConditions[i], err = expr.ResolveIndices(rSchema) + if err != nil { + return err + } + } + for i, expr := range p.OtherConditions { + p.OtherConditions[i], err = expr.ResolveIndices(expression.MergeSchema(lSchema, rSchema)) + if err != nil { + return err + } + } + return +} + // ResolveIndices implements Plan interface. func (p *PhysicalMergeJoin) ResolveIndices() (err error) { err = p.physicalSchemaProducer.ResolveIndices() diff --git a/planner/core/task.go b/planner/core/task.go index b38bc98de3d78..721d3ebbf5602 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -151,6 +151,9 @@ func (t *copTask) getStoreType() kv.StoreType { } tp := t.tablePlan for len(tp.Children()) > 0 { + if len(tp.Children()) > 1 { + return kv.TiFlash + } tp = tp.Children()[0] } if ts, ok := tp.(*PhysicalTableScan); ok { @@ -519,10 +522,70 @@ func (p *PhysicalHashJoin) attach2Task(tasks ...task) task { lTask := finishCopTask(p.ctx, tasks[0].copy()) rTask := finishCopTask(p.ctx, tasks[1].copy()) p.SetChildren(lTask.plan(), rTask.plan()) - return &rootTask{ + task := &rootTask{ p: p, cst: lTask.cost() + rTask.cost() + p.GetCost(lTask.count(), rTask.count()), } + return task +} + +// GetCost computes cost of broadcast join operator itself. +func (p *PhysicalBroadCastJoin) GetCost(lCnt, rCnt float64) float64 { + buildCnt := lCnt + if p.InnerChildIdx == 1 { + buildCnt = rCnt + } + sessVars := p.ctx.GetSessionVars() + // Cost of building hash table. + cpuCost := buildCnt * sessVars.CopCPUFactor + memoryCost := buildCnt * sessVars.MemoryFactor + // Number of matched row pairs regarding the equal join conditions. + helper := &fullJoinRowCountHelper{ + cartesian: false, + leftProfile: p.children[0].statsInfo(), + rightProfile: p.children[1].statsInfo(), + leftJoinKeys: p.LeftJoinKeys, + rightJoinKeys: p.RightJoinKeys, + leftSchema: p.children[0].Schema(), + rightSchema: p.children[1].Schema(), + } + numPairs := helper.estimate() + probeCost := numPairs * sessVars.CopCPUFactor + // should divided by the concurrency in tiflash, which should be the number of core in tiflash nodes. + probeCost /= float64(sessVars.CopTiFlashConcurrencyFactor) + cpuCost += probeCost + + // todo since TiFlash join is significant faster than TiDB join, maybe + // need to add a variable like 'tiflash_accelerate_factor', and divide + // the final cost by that factor + return cpuCost + memoryCost +} + +func (p *PhysicalBroadCastJoin) attach2Task(tasks ...task) task { + lTask, lok := tasks[0].(*copTask) + rTask, rok := tasks[1].(*copTask) + if !lok || !rok || (lTask.getStoreType() != kv.TiFlash && rTask.getStoreType() != kv.TiFlash) { + return invalidTask + } + p.SetChildren(lTask.plan(), rTask.plan()) + p.schema = BuildPhysicalJoinSchema(p.JoinType, p) + if !lTask.indexPlanFinished { + lTask.finishIndexPlan() + } + if !rTask.indexPlanFinished { + rTask.finishIndexPlan() + } + + lCost := lTask.cost() + rCost := rTask.cost() + + task := &copTask{ + tblColHists: rTask.tblColHists, + indexPlanFinished: true, + tablePlan: p, + cst: lCost + rCost + p.GetCost(lTask.count(), rTask.count()), + } + return task } // GetCost computes cost of merge join operator itself. @@ -668,7 +731,12 @@ func finishCopTask(ctx sessionctx.Context, task task) task { } else { tp := t.tablePlan for len(tp.Children()) > 0 { - tp = tp.Children()[0] + if len(tp.Children()) == 1 { + tp = tp.Children()[0] + } else { + join := tp.(*PhysicalBroadCastJoin) + tp = join.children[1-join.InnerChildIdx] + } } ts := tp.(*PhysicalTableScan) p := PhysicalTableReader{ diff --git a/planner/core/testdata/integration_serial_suite_in.json b/planner/core/testdata/integration_serial_suite_in.json index dd62c3b372578..64707fbaadce4 100644 --- a/planner/core/testdata/integration_serial_suite_in.json +++ b/planner/core/testdata/integration_serial_suite_in.json @@ -6,6 +6,15 @@ "explain select * from t where cast(t.a as float) + 3 = 5.1" ] }, + { + "name": "TestBroadcastJoin", + "cases": [ + "explain select /*+ broadcast_join(fact_t,d1_t) */ count(*) from fact_t, d1_t where fact_t.d1_k = d1_t.d1_k", + "explain select /*+ broadcast_join(fact_t,d1_t,d2_t,d3_t) */ count(*) from fact_t, d1_t, d2_t, d3_t where fact_t.d1_k = d1_t.d1_k and fact_t.d2_k = d2_t.d2_k and fact_t.d3_k = d3_t.d3_k", + "explain select /*+ broadcast_join(fact_t,d1_t), broadcast_join_local(d1_t) */ count(*) from fact_t, d1_t where fact_t.d1_k = d1_t.d1_k", + "explain select /*+ broadcast_join(fact_t,d1_t,d2_t,d3_t), broadcast_join_local(d2_t) */ count(*) from fact_t, d1_t, d2_t, d3_t where fact_t.d1_k = d1_t.d1_k and fact_t.d2_k = d2_t.d2_k and fact_t.d3_k = d3_t.d3_k" + ] + }, { "name": "TestReadFromStorageHint", "cases": [ diff --git a/planner/core/testdata/integration_serial_suite_out.json b/planner/core/testdata/integration_serial_suite_out.json index f3ce566e30d0e..74fda0f45a033 100644 --- a/planner/core/testdata/integration_serial_suite_out.json +++ b/planner/core/testdata/integration_serial_suite_out.json @@ -20,6 +20,75 @@ } ] }, + { + "Name": "TestBroadcastJoin", + "Cases": [ + { + "SQL": "explain select /*+ broadcast_join(fact_t,d1_t) */ count(*) from fact_t, d1_t where fact_t.d1_k = d1_t.d1_k", + "Plan": [ + "StreamAgg_32 1.00 root funcs:count(Column#14)->Column#11", + "└─TableReader_33 1.00 root data:StreamAgg_13", + " └─StreamAgg_13 1.00 cop[tiflash] funcs:count(1)->Column#14", + " └─BroadcastJoin_31 8.00 cop[tiflash] ", + " ├─Selection_23(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))", + " │ └─TableFullScan_22 2.00 cop[tiflash] table:d1_t keep order:false, global read", + " └─Selection_21(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k))", + " └─TableFullScan_20 8.00 cop[tiflash] table:fact_t keep order:false" + ] + }, + { + "SQL": "explain select /*+ broadcast_join(fact_t,d1_t,d2_t,d3_t) */ count(*) from fact_t, d1_t, d2_t, d3_t where fact_t.d1_k = d1_t.d1_k and fact_t.d2_k = d2_t.d2_k and fact_t.d3_k = d3_t.d3_k", + "Plan": [ + "StreamAgg_52 1.00 root funcs:count(Column#20)->Column#17", + "└─TableReader_53 1.00 root data:StreamAgg_17", + " └─StreamAgg_17 1.00 cop[tiflash] funcs:count(1)->Column#20", + " └─BroadcastJoin_51 8.00 cop[tiflash] ", + " ├─Selection_43(Build) 2.00 cop[tiflash] not(isnull(test.d3_t.d3_k))", + " │ └─TableFullScan_42 2.00 cop[tiflash] table:d3_t keep order:false, global read", + " └─BroadcastJoin_33(Probe) 8.00 cop[tiflash] ", + " ├─Selection_29(Build) 2.00 cop[tiflash] not(isnull(test.d2_t.d2_k))", + " │ └─TableFullScan_28 2.00 cop[tiflash] table:d2_t keep order:false, global read", + " └─BroadcastJoin_37(Probe) 8.00 cop[tiflash] ", + " ├─Selection_27(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))", + " │ └─TableFullScan_26 2.00 cop[tiflash] table:d1_t keep order:false, global read", + " └─Selection_41(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k)), not(isnull(test.fact_t.d2_k)), not(isnull(test.fact_t.d3_k))", + " └─TableFullScan_40 8.00 cop[tiflash] table:fact_t keep order:false" + ] + }, + { + "SQL": "explain select /*+ broadcast_join(fact_t,d1_t), broadcast_join_local(d1_t) */ count(*) from fact_t, d1_t where fact_t.d1_k = d1_t.d1_k", + "Plan": [ + "StreamAgg_25 1.00 root funcs:count(Column#14)->Column#11", + "└─TableReader_26 1.00 root data:StreamAgg_13", + " └─StreamAgg_13 1.00 cop[tiflash] funcs:count(1)->Column#14", + " └─BroadcastJoin_24 8.00 cop[tiflash] ", + " ├─Selection_18(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))", + " │ └─TableFullScan_17 2.00 cop[tiflash] table:d1_t keep order:false", + " └─Selection_16(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k))", + " └─TableFullScan_15 8.00 cop[tiflash] table:fact_t keep order:false, global read" + ] + }, + { + "SQL": "explain select /*+ broadcast_join(fact_t,d1_t,d2_t,d3_t), broadcast_join_local(d2_t) */ count(*) from fact_t, d1_t, d2_t, d3_t where fact_t.d1_k = d1_t.d1_k and fact_t.d2_k = d2_t.d2_k and fact_t.d3_k = d3_t.d3_k", + "Plan": [ + "StreamAgg_36 1.00 root funcs:count(Column#20)->Column#17", + "└─TableReader_37 1.00 root data:StreamAgg_17", + " └─StreamAgg_17 1.00 cop[tiflash] funcs:count(1)->Column#20", + " └─BroadcastJoin_35 8.00 cop[tiflash] ", + " ├─Selection_29(Build) 2.00 cop[tiflash] not(isnull(test.d3_t.d3_k))", + " │ └─TableFullScan_28 2.00 cop[tiflash] table:d3_t keep order:false, global read", + " └─BroadcastJoin_19(Probe) 8.00 cop[tiflash] ", + " ├─Selection_27(Build) 2.00 cop[tiflash] not(isnull(test.d2_t.d2_k))", + " │ └─TableFullScan_26 2.00 cop[tiflash] table:d2_t keep order:false", + " └─BroadcastJoin_20(Probe) 8.00 cop[tiflash] ", + " ├─Selection_25(Build) 2.00 cop[tiflash] not(isnull(test.d1_t.d1_k))", + " │ └─TableFullScan_24 2.00 cop[tiflash] table:d1_t keep order:false, global read", + " └─Selection_23(Probe) 8.00 cop[tiflash] not(isnull(test.fact_t.d1_k)), not(isnull(test.fact_t.d2_k)), not(isnull(test.fact_t.d3_k))", + " └─TableFullScan_22 8.00 cop[tiflash] table:fact_t keep order:false, global read" + ] + } + ] + }, { "Name": "TestReadFromStorageHint", "Cases": [ diff --git a/planner/property/physical_property.go b/planner/property/physical_property.go index 38da639c71942..1753031bd0bb5 100644 --- a/planner/property/physical_property.go +++ b/planner/property/physical_property.go @@ -20,6 +20,10 @@ import ( "github.com/pingcap/tidb/util/codec" ) +// wholeTaskTypes records all possible kinds of task that a plan can return. For Agg, TopN and Limit, we will try to get +// these tasks one by one. +var wholeTaskTypes = []TaskType{CopSingleReadTaskType, CopDoubleReadTaskType, RootTaskType} + // Item wraps the column and its order. type Item struct { Col *expression.Column @@ -83,6 +87,20 @@ func (p *PhysicalProperty) AllColsFromSchema(schema *expression.Schema) bool { return true } +// IsFlashOnlyProp return true if this physical property is only allowed to generate flash related task +func (p *PhysicalProperty) IsFlashOnlyProp() bool { + return p.TaskTp == CopTiFlashLocalReadTaskType || p.TaskTp == CopTiFlashGlobalReadTaskType +} + +// GetAllPossibleChildTaskTypes enumrates the possible types of tasks for children. +func (p *PhysicalProperty) GetAllPossibleChildTaskTypes() []TaskType { + if p.TaskTp == RootTaskType { + return wholeTaskTypes + } + // TODO: For CopSingleReadTaskType and CopDoubleReadTaskType, this function should never be called + return []TaskType{p.TaskTp} +} + // IsPrefix checks whether the order property is the prefix of another. func (p *PhysicalProperty) IsPrefix(prop *PhysicalProperty) bool { if len(p.Items) > len(prop.Items) { diff --git a/planner/property/task_type.go b/planner/property/task_type.go index 93360fc14ce9b..3f91509ca710b 100644 --- a/planner/property/task_type.go +++ b/planner/property/task_type.go @@ -27,6 +27,17 @@ const ( // CopDoubleReadTaskType stands for the a IndexLookup tasks executed in the // coprocessor layer. CopDoubleReadTaskType + + // CopTiFlashLocalReadTaskType stands for flash coprocessor that read data locally, + // and only a part of the data is read in one cop task, if the current task type is + // CopTiFlashLocalReadTaskType, all its children prop's task type is CopTiFlashLocalReadTaskType + CopTiFlashLocalReadTaskType + + // CopTiFlashGlobalReadTaskType stands for flash coprocessor that read data globally + // and all the data of given table will be read in one cop task, if the current task + // type is CopTiFlashGlobalReadTaskType, all its children prop's task type is + // CopTiFlashGlobalReadTaskType + CopTiFlashGlobalReadTaskType ) // String implements fmt.Stringer interface. @@ -38,6 +49,10 @@ func (t TaskType) String() string { return "copSingleReadTask" case CopDoubleReadTaskType: return "copDoubleReadTask" + case CopTiFlashLocalReadTaskType: + return "copTiFlashLocalReadTask" + case CopTiFlashGlobalReadTaskType: + return "copTiFlashGlobalReadTask" } return "UnknownTaskType" } diff --git a/planner/util/path.go b/planner/util/path.go index e12ee41f569b9..8c1ff581b0479 100644 --- a/planner/util/path.go +++ b/planner/util/path.go @@ -50,8 +50,17 @@ type AccessPath struct { IsDNFCond bool +<<<<<<< HEAD // IsTablePath indicates whether this path is table path. IsTablePath bool +======= + // IsTiFlashGlobalRead indicates whether this path is a remote read path for tiflash + IsTiFlashGlobalRead bool + + // IsIntHandlePath indicates whether this path is table path. + IsIntHandlePath bool + IsCommonHandlePath bool +>>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) // Forced means this path is generated by `use/force index()`. Forced bool diff --git a/session/session.go b/session/session.go index 706e440a042b9..605bd71b1b6ca 100644 --- a/session/session.go +++ b/session/session.go @@ -1949,6 +1949,7 @@ var builtinGlobalVariable = []string{ variable.TiDBEnableIndexMerge, variable.TiDBTxnMode, variable.TiDBAllowBatchCop, + variable.TiDBOptBCJ, variable.TiDBRowFormatVersion, variable.TiDBEnableStmtSummary, variable.TiDBStmtSummaryInternalQuery, diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 70e24677082e3..00e38dd6530ec 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -384,6 +384,8 @@ type SessionVars struct { // AllowAggPushDown can be set to false to forbid aggregation push down. AllowAggPushDown bool + // AllowBCJ means allow broadcast join. + AllowBCJ bool // AllowDistinctAggPushDown can be set true to allow agg with distinct push down to tikv/tiflash. AllowDistinctAggPushDown bool @@ -408,6 +410,8 @@ type SessionVars struct { CPUFactor float64 // CopCPUFactor is the CPU cost of processing one expression for one row in coprocessor. CopCPUFactor float64 + // CopTiFlashConcurrencyFactor is the concurrency number of computation in tiflash coprocessor. + CopTiFlashConcurrencyFactor float64 // NetworkFactor is the network cost of transferring 1 byte data. NetworkFactor float64 // ScanFactor is the IO cost of scanning 1 byte data on TiKV and TiFlash. @@ -657,6 +661,7 @@ func NewSessionVars() *SessionVars { Status: mysql.ServerStatusAutocommit, StmtCtx: new(stmtctx.StatementContext), AllowAggPushDown: false, + AllowBCJ: false, OptimizerSelectivityLevel: DefTiDBOptimizerSelectivityLevel, RetryLimit: DefTiDBRetryLimit, DisableTxnAutoRetry: DefTiDBDisableTxnAutoRetry, @@ -666,6 +671,7 @@ func NewSessionVars() *SessionVars { CorrelationExpFactor: DefOptCorrelationExpFactor, CPUFactor: DefOptCPUFactor, CopCPUFactor: DefOptCopCPUFactor, + CopTiFlashConcurrencyFactor: DefOptTiFlashConcurrencyFactor, NetworkFactor: DefOptNetworkFactor, ScanFactor: DefOptScanFactor, DescScanFactor: DefOptDescScanFactor, @@ -1075,6 +1081,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.SkipUTF8Check = TiDBOptOn(val) case TiDBOptAggPushDown: s.AllowAggPushDown = TiDBOptOn(val) + case TiDBOptBCJ: + s.AllowBCJ = TiDBOptOn(val) case TiDBOptDistinctAggPushDown: s.AllowDistinctAggPushDown = TiDBOptOn(val) case TiDBOptWriteRowID: @@ -1089,6 +1097,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.CPUFactor = tidbOptFloat64(val, DefOptCPUFactor) case TiDBOptCopCPUFactor: s.CopCPUFactor = tidbOptFloat64(val, DefOptCopCPUFactor) + case TiDBOptTiFlashConcurrencyFactor: + s.CopTiFlashConcurrencyFactor = tidbOptFloat64(val, DefOptTiFlashConcurrencyFactor) case TiDBOptNetworkFactor: s.NetworkFactor = tidbOptFloat64(val, DefOptNetworkFactor) case TiDBOptScanFactor: diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index a274f3b067afd..7bfa4019bc817 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -611,6 +611,7 @@ var defaultSysVars = []*SysVar{ /* TiDB specific variables */ {ScopeSession, TiDBSnapshot, ""}, {ScopeSession, TiDBOptAggPushDown, BoolToIntStr(DefOptAggPushDown)}, + {ScopeGlobal | ScopeSession, TiDBOptBCJ, BoolToIntStr(DefOptBCJ)}, {ScopeSession, TiDBOptDistinctAggPushDown, BoolToIntStr(config.GetGlobalConfig().Performance.DistinctAggPushDown)}, {ScopeSession, TiDBOptWriteRowID, BoolToIntStr(DefOptWriteRowID)}, {ScopeGlobal | ScopeSession, TiDBBuildStatsConcurrency, strconv.Itoa(DefBuildStatsConcurrency)}, @@ -623,6 +624,7 @@ var defaultSysVars = []*SysVar{ {ScopeGlobal | ScopeSession, TiDBOptCorrelationThreshold, strconv.FormatFloat(DefOptCorrelationThreshold, 'f', -1, 64)}, {ScopeGlobal | ScopeSession, TiDBOptCorrelationExpFactor, strconv.Itoa(DefOptCorrelationExpFactor)}, {ScopeGlobal | ScopeSession, TiDBOptCPUFactor, strconv.FormatFloat(DefOptCPUFactor, 'f', -1, 64)}, + {ScopeGlobal | ScopeSession, TiDBOptTiFlashConcurrencyFactor, strconv.FormatFloat(DefOptTiFlashConcurrencyFactor, 'f', -1, 64)}, {ScopeGlobal | ScopeSession, TiDBOptCopCPUFactor, strconv.FormatFloat(DefOptCopCPUFactor, 'f', -1, 64)}, {ScopeGlobal | ScopeSession, TiDBOptNetworkFactor, strconv.FormatFloat(DefOptNetworkFactor, 'f', -1, 64)}, {ScopeGlobal | ScopeSession, TiDBOptScanFactor, strconv.FormatFloat(DefOptScanFactor, 'f', -1, 64)}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 9649329d63877..f9126b377fddb 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -45,6 +45,7 @@ const ( // tidb_opt_agg_push_down is used to enable/disable the optimizer rule of aggregation push down. TiDBOptAggPushDown = "tidb_opt_agg_push_down" + TiDBOptBCJ = "tidb_opt_broadcast_join" // tidb_opt_distinct_agg_push_down is used to decide whether agg with distinct should be pushed to tikv/tiflash. TiDBOptDistinctAggPushDown = "tidb_opt_distinct_agg_push_down" @@ -201,6 +202,8 @@ const ( TiDBOptCPUFactor = "tidb_opt_cpu_factor" // tidb_opt_copcpu_factor is the CPU cost of processing one expression for one row in coprocessor. TiDBOptCopCPUFactor = "tidb_opt_copcpu_factor" + // tidb_opt_tiflash_concurrency_factor is concurrency number of tiflash computation. + TiDBOptTiFlashConcurrencyFactor = "tidb_opt_tiflash_concurrency_factor" // tidb_opt_network_factor is the network cost of transferring 1 byte data. TiDBOptNetworkFactor = "tidb_opt_network_factor" // tidb_opt_scan_factor is the IO cost of scanning 1 byte data on TiKV. @@ -426,11 +429,13 @@ const ( DefChecksumTableConcurrency = 4 DefSkipUTF8Check = false DefOptAggPushDown = false + DefOptBCJ = false DefOptWriteRowID = false DefOptCorrelationThreshold = 0.9 DefOptCorrelationExpFactor = 1 DefOptCPUFactor = 3.0 DefOptCopCPUFactor = 3.0 + DefOptTiFlashConcurrencyFactor = 24.0 DefOptNetworkFactor = 1.0 DefOptScanFactor = 1.5 DefOptDescScanFactor = 3.0 diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index b94f7cbe4042a..639fd41ba9742 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -435,8 +435,18 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string, scope Sc return "ON", nil } return value, ErrWrongValueForVar.GenWithStackByArgs(name, value) +<<<<<<< HEAD case TiDBSkipUTF8Check, TiDBOptAggPushDown, TiDBOptDistinctAggPushDown, TiDBOptInSubqToJoinAndAgg, TiDBEnableFastAnalyze, +======= + case TiDBOptBCJ: + if (strings.EqualFold(value, "ON") || value == "1") && vars.AllowBatchCop == 0 { + return value, ErrWrongValueForVar.GenWithStackByArgs("Can't set Broadcast Join to 1 but tidb_allow_batch_cop is 0, please active batch cop at first.") + } + return value, nil + case TiDBSkipUTF8Check, TiDBSkipASCIICheck, TiDBOptAggPushDown, + TiDBOptDistinctAggPushDown, TiDBOptInSubqToJoinAndAgg, TiDBEnableFastAnalyze, +>>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) TiDBBatchInsert, TiDBDisableTxnAutoRetry, TiDBEnableStreaming, TiDBEnableChunkRPC, TiDBBatchDelete, TiDBBatchCommit, TiDBEnableCascadesPlanner, TiDBEnableWindowFunction, TiDBPProfSQLCPU, TiDBLowResolutionTSO, TiDBEnableIndexMerge, TiDBEnableNoopFuncs, @@ -547,11 +557,15 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string, scope Sc if err != nil { return value, ErrWrongTypeForVar.GenWithStackByArgs(name) } + if v == 0 && vars.AllowBCJ { + return value, ErrWrongValueForVar.GenWithStackByArgs("Can't set batch cop 0 but tidb_opt_broadcast_join is 1, please set tidb_opt_broadcast_join 0 at first") + } if v < 0 || v > 2 { return value, ErrWrongValueForVar.GenWithStackByArgs(name, value) } return value, nil case TiDBOptCPUFactor, + TiDBOptTiFlashConcurrencyFactor, TiDBOptCopCPUFactor, TiDBOptNetworkFactor, TiDBOptScanFactor, diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index 0c24b698e3c88..29ab3f190e34b 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -65,11 +65,26 @@ func (s *testVarsutilSuite) TestNewSessionVars(c *C) { c.Assert(vars.IndexLookupJoinConcurrency, Equals, DefIndexLookupJoinConcurrency) c.Assert(vars.HashJoinConcurrency, Equals, DefTiDBHashJoinConcurrency) c.Assert(vars.AllowBatchCop, Equals, DefTiDBAllowBatchCop) +<<<<<<< HEAD c.Assert(vars.ProjectionConcurrency, Equals, int64(DefTiDBProjectionConcurrency)) c.Assert(vars.HashAggPartialConcurrency, Equals, DefTiDBHashAggPartialConcurrency) c.Assert(vars.HashAggFinalConcurrency, Equals, DefTiDBHashAggFinalConcurrency) c.Assert(vars.WindowConcurrency, Equals, DefTiDBWindowConcurrency) c.Assert(vars.DistSQLScanConcurrency, Equals, DefDistSQLScanConcurrency) +======= + c.Assert(vars.AllowBCJ, Equals, DefOptBCJ) + c.Assert(vars.projectionConcurrency, Equals, ConcurrencyUnset) + c.Assert(vars.hashAggPartialConcurrency, Equals, ConcurrencyUnset) + c.Assert(vars.hashAggFinalConcurrency, Equals, ConcurrencyUnset) + c.Assert(vars.windowConcurrency, Equals, ConcurrencyUnset) + c.Assert(vars.distSQLScanConcurrency, Equals, DefDistSQLScanConcurrency) + c.Assert(vars.ProjectionConcurrency(), Equals, DefExecutorConcurrency) + c.Assert(vars.HashAggPartialConcurrency(), Equals, DefExecutorConcurrency) + c.Assert(vars.HashAggFinalConcurrency(), Equals, DefExecutorConcurrency) + c.Assert(vars.WindowConcurrency(), Equals, DefExecutorConcurrency) + c.Assert(vars.DistSQLScanConcurrency(), Equals, DefDistSQLScanConcurrency) + c.Assert(vars.ExecutorConcurrency, Equals, DefExecutorConcurrency) +>>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) c.Assert(vars.MaxChunkSize, Equals, DefMaxChunkSize) c.Assert(vars.DMLBatchSize, Equals, DefDMLBatchSize) c.Assert(vars.MemQuotaQuery, Equals, config.GetGlobalConfig().MemQuotaQuery) @@ -194,6 +209,17 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) { c.Assert(v.SQLMode, Equals, mode) } + err = SetSessionSystemVar(v, "tidb_opt_broadcast_join", types.NewStringDatum("1")) + c.Assert(err, IsNil) + err = SetSessionSystemVar(v, "tidb_allow_batch_cop", types.NewStringDatum("0")) + c.Assert(terror.ErrorEqual(err, ErrWrongValueForVar), IsTrue) + err = SetSessionSystemVar(v, "tidb_opt_broadcast_join", types.NewStringDatum("0")) + c.Assert(err, IsNil) + err = SetSessionSystemVar(v, "tidb_allow_batch_cop", types.NewStringDatum("0")) + c.Assert(err, IsNil) + err = SetSessionSystemVar(v, "tidb_opt_broadcast_join", types.NewStringDatum("1")) + c.Assert(terror.ErrorEqual(err, ErrWrongValueForVar), IsTrue) + // Combined sql_mode SetSessionSystemVar(v, "sql_mode", types.NewStringDatum("REAL_AS_FLOAT,ANSI_QUOTES")) c.Assert(v.SQLMode, Equals, mysql.ModeRealAsFloat|mysql.ModeANSIQuotes) @@ -316,6 +342,14 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) { c.Assert(val, Equals, "5.0") c.Assert(v.CopCPUFactor, Equals, 5.0) + c.Assert(v.CopTiFlashConcurrencyFactor, Equals, 24.0) + err = SetSessionSystemVar(v, TiDBOptTiFlashConcurrencyFactor, types.NewStringDatum("5.0")) + c.Assert(err, IsNil) + val, err = GetSessionSystemVar(v, TiDBOptTiFlashConcurrencyFactor) + c.Assert(err, IsNil) + c.Assert(val, Equals, "5.0") + c.Assert(v.CopCPUFactor, Equals, 5.0) + c.Assert(v.NetworkFactor, Equals, 1.0) err = SetSessionSystemVar(v, TiDBOptNetworkFactor, types.NewStringDatum("3.0")) c.Assert(err, IsNil) @@ -491,6 +525,7 @@ func (s *testVarsutilSuite) TestValidate(c *C) { {TiDBOptCorrelationThreshold, "-2", true}, {TiDBOptCPUFactor, "a", true}, {TiDBOptCPUFactor, "-2", true}, + {TiDBOptTiFlashConcurrencyFactor, "-2", true}, {TiDBOptCopCPUFactor, "a", true}, {TiDBOptCopCPUFactor, "-2", true}, {TiDBOptNetworkFactor, "a", true}, diff --git a/store/mockstore/mocktikv/cop_handler_dag.go b/store/mockstore/mocktikv/cop_handler_dag.go index c418d5b99194b..9886f608ae4fc 100644 --- a/store/mockstore/mocktikv/cop_handler_dag.go +++ b/store/mockstore/mocktikv/cop_handler_dag.go @@ -60,7 +60,7 @@ func (h *rpcHandler) handleCopDAGRequest(req *coprocessor.Request) *coprocessor. resp.RegionError = err return resp } - dagCtx, e, dagReq, err := h.buildDAGExecutor(req) + dagCtx, e, dagReq, err := h.buildDAGExecutor(req, false) if err != nil { resp.OtherError = err.Error() return resp @@ -92,7 +92,7 @@ func (h *rpcHandler) handleCopDAGRequest(req *coprocessor.Request) *coprocessor. return buildResp(selResp, execDetails, err) } -func (h *rpcHandler) buildDAGExecutor(req *coprocessor.Request) (*dagContext, executor, *tipb.DAGRequest, error) { +func (h *rpcHandler) buildDAGExecutor(req *coprocessor.Request, batchCop bool) (*dagContext, executor, *tipb.DAGRequest, error) { if len(req.Ranges) == 0 { return nil, nil, nil, errors.New("request range is null") } @@ -118,7 +118,12 @@ func (h *rpcHandler) buildDAGExecutor(req *coprocessor.Request) (*dagContext, ex startTS: req.StartTs, evalCtx: &evalContext{sc: sc}, } - e, err := h.buildDAG(ctx, dagReq.Executors) + var e executor + if batchCop { + e, err = h.buildDAGForTiFlash(ctx, dagReq.RootExecutor) + } else { + e, err = h.buildDAG(ctx, dagReq.Executors) + } if err != nil { return nil, nil, nil, errors.Trace(err) } @@ -133,7 +138,7 @@ func constructTimeZone(name string, offset int) (*time.Location, error) { } func (h *rpcHandler) handleCopStream(ctx context.Context, req *coprocessor.Request) (tikvpb.Tikv_CoprocessorStreamClient, error) { - dagCtx, e, dagReq, err := h.buildDAGExecutor(req) + dagCtx, e, dagReq, err := h.buildDAGExecutor(req, false) if err != nil { return nil, errors.Trace(err) } @@ -146,9 +151,10 @@ func (h *rpcHandler) handleCopStream(ctx context.Context, req *coprocessor.Reque }, nil } -func (h *rpcHandler) buildExec(ctx *dagContext, curr *tipb.Executor) (executor, error) { +func (h *rpcHandler) buildExec(ctx *dagContext, curr *tipb.Executor) (executor, *tipb.Executor, error) { var currExec executor var err error + var childExec *tipb.Executor switch curr.GetTp() { case tipb.ExecType_TypeTableScan: currExec, err = h.buildTableScan(ctx, curr) @@ -156,26 +162,46 @@ func (h *rpcHandler) buildExec(ctx *dagContext, curr *tipb.Executor) (executor, currExec, err = h.buildIndexScan(ctx, curr) case tipb.ExecType_TypeSelection: currExec, err = h.buildSelection(ctx, curr) + childExec = curr.Selection.Child case tipb.ExecType_TypeAggregation: currExec, err = h.buildHashAgg(ctx, curr) + childExec = curr.Aggregation.Child case tipb.ExecType_TypeStreamAgg: currExec, err = h.buildStreamAgg(ctx, curr) + childExec = curr.Aggregation.Child case tipb.ExecType_TypeTopN: currExec, err = h.buildTopN(ctx, curr) + childExec = curr.TopN.Child case tipb.ExecType_TypeLimit: currExec = &limitExec{limit: curr.Limit.GetLimit(), execDetail: new(execDetail)} + childExec = curr.Limit.Child default: // TODO: Support other types. err = errors.Errorf("this exec type %v doesn't support yet.", curr.GetTp()) } - return currExec, errors.Trace(err) + return currExec, childExec, errors.Trace(err) +} + +func (h *rpcHandler) buildDAGForTiFlash(ctx *dagContext, farther *tipb.Executor) (executor, error) { + curr, child, err := h.buildExec(ctx, farther) + if err != nil { + return nil, errors.Trace(err) + } + if child != nil { + childExec, err := h.buildDAGForTiFlash(ctx, child) + if err != nil { + return nil, errors.Trace(err) + } + curr.SetSrcExec(childExec) + } + return curr, nil } func (h *rpcHandler) buildDAG(ctx *dagContext, executors []*tipb.Executor) (executor, error) { var src executor for i := 0; i < len(executors); i++ { - curr, err := h.buildExec(ctx, executors[i]) + curr, _, err := h.buildExec(ctx, executors[i]) if err != nil { return nil, errors.Trace(err) } diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index 324e838d6b574..7a16364c6834c 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -663,6 +663,47 @@ func (h *rpcHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb return resp } +<<<<<<< HEAD +======= +func drainRowsFromExecutor(ctx context.Context, e executor, req *tipb.DAGRequest) (tipb.Chunk, error) { + var chunk tipb.Chunk + for { + row, err := e.Next(ctx) + if err != nil { + return chunk, errors.Trace(err) + } + if row == nil { + return chunk, nil + } + for _, offset := range req.OutputOffsets { + chunk.RowsData = append(chunk.RowsData, row[offset]...) + } + } +} + +func (h *rpcHandler) handleBatchCopRequest(ctx context.Context, req *coprocessor.BatchRequest) (*mockBatchCopDataClient, error) { + client := &mockBatchCopDataClient{} + for _, ri := range req.Regions { + cop := coprocessor.Request{ + Tp: kv.ReqTypeDAG, + Data: req.Data, + StartTs: req.StartTs, + Ranges: ri.Ranges, + } + _, exec, dagReq, err := h.buildDAGExecutor(&cop, true) + if err != nil { + return nil, errors.Trace(err) + } + chunk, err := drainRowsFromExecutor(ctx, exec, dagReq) + if err != nil { + return nil, errors.Trace(err) + } + client.chunks = append(client.chunks, chunk) + } + return client, nil +} + +>>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) // Client is a client that sends RPC. // This is same with tikv.Client, define again for avoid circle import. type Client interface { diff --git a/store/tikv/batch_coprocessor.go b/store/tikv/batch_coprocessor.go index e73a5b89ac425..6d4eefd4d998e 100644 --- a/store/tikv/batch_coprocessor.go +++ b/store/tikv/batch_coprocessor.go @@ -64,7 +64,7 @@ func (rs *batchCopResponse) GetStartKey() kv.Key { // GetExecDetails is unavailable currently, because TiFlash has not collected exec details for batch cop. // TODO: Will fix in near future. func (rs *batchCopResponse) GetExecDetails() *execdetails.ExecDetails { - return &execdetails.ExecDetails{} + return rs.detail } // MemSize returns how many bytes of memory this response use @@ -287,7 +287,7 @@ func (b *batchCopIterator) handleTask(ctx context.Context, bo *Backoffer, task * for idx := 0; idx < len(tasks); idx++ { ret, err := b.handleTaskOnce(ctx, bo, tasks[idx]) if err != nil { - resp := &batchCopResponse{err: errors.Trace(err)} + resp := &batchCopResponse{err: errors.Trace(err), detail: new(execdetails.ExecDetails)} b.sendToRespCh(resp) break } @@ -395,9 +395,22 @@ func (b *batchCopIterator) handleBatchCopResponse(bo *Backoffer, response *copro return errors.Trace(err) } - b.sendToRespCh(&batchCopResponse{ + resp := batchCopResponse{ pbResp: response, - }) + detail: new(execdetails.ExecDetails), + } + + resp.detail.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond + resp.detail.BackoffSleep = make(map[string]time.Duration, len(bo.backoffTimes)) + resp.detail.BackoffTimes = make(map[string]int, len(bo.backoffTimes)) + for backoff := range bo.backoffTimes { + backoffName := backoff.String() + resp.detail.BackoffTimes[backoffName] = bo.backoffTimes[backoff] + resp.detail.BackoffSleep[backoffName] = time.Duration(bo.backoffSleepMS[backoff]) * time.Millisecond + } + resp.detail.CalleeAddress = task.storeAddr + + b.sendToRespCh(&resp) return } diff --git a/util/plancodec/id.go b/util/plancodec/id.go index b460bf86e70ec..25cc7b0e35a24 100644 --- a/util/plancodec/id.go +++ b/util/plancodec/id.go @@ -52,6 +52,8 @@ const ( TypeLimit = "Limit" // TypeHashJoin is the type of hash join. TypeHashJoin = "HashJoin" + // TypeBroadcastJoin is the type of broad cast join. + TypeBroadcastJoin = "BroadcastJoin" // TypeMergeJoin is the type of merge join. TypeMergeJoin = "MergeJoin" // TypeIndexJoin is the type of index look up join. From bee9f5569826830bbbd789eda21210a5b79f9cea Mon Sep 17 00:00:00 2001 From: Han Fei Date: Wed, 29 Jul 2020 22:10:26 +0800 Subject: [PATCH 2/4] refine code --- executor/builder.go | 5 ---- go.mod | 10 ++----- go.sum | 7 ++--- planner/core/find_best_task.go | 9 ------ planner/core/logical_plan_builder.go | 8 ++---- planner/core/physical_plans.go | 18 ------------ planner/core/plan_to_pb.go | 8 +----- planner/core/planbuilder.go | 27 ++---------------- planner/util/path.go | 6 ---- sessionctx/variable/varsutil.go | 9 ++---- sessionctx/variable/varsutil_test.go | 16 +---------- store/mockstore/mocktikv/rpc.go | 41 ---------------------------- 12 files changed, 12 insertions(+), 152 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 1e5e6abbfce8b..31069c13a6295 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -2448,13 +2448,8 @@ func buildTableReq(b *executorBuilder, schemaLen int, plans []plannercore.Physic return tableReq, tableStreaming, tbl, err } -<<<<<<< HEAD func buildIndexReq(b *executorBuilder, schemaLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, err error) { - indexReq, indexStreaming, err := b.constructDAGReq(plans) -======= -func buildIndexReq(b *executorBuilder, schemaLen, handleLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, err error) { indexReq, indexStreaming, err := b.constructDAGReq(plans, kv.TiKV) ->>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) if err != nil { return nil, false, err } diff --git a/go.mod b/go.mod index 7d29addbaf88f..a7abfcdffee1f 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,7 @@ module github.com/pingcap/tidb +replace github.com/pingcap/parser v0.0.0-20200623164729-3a18f1e5dceb => github.com/pingcap/parser v0.0.0-20200729094414-b5a528cdf2fc + require ( github.com/BurntSushi/toml v0.3.1 github.com/Jeffail/gabs/v2 v2.5.1 @@ -32,19 +34,11 @@ require ( github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 github.com/pingcap/kvproto v0.0.0-20200706115936-1e0910aabe6c github.com/pingcap/log v0.0.0-20200511115504-543df19646ad -<<<<<<< HEAD github.com/pingcap/parser v0.0.0-20200623164729-3a18f1e5dceb github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200520083007-2c251bd8f181 github.com/pingcap/sysutil v0.0.0-20200408114249-ed3bd6f7fdb1 github.com/pingcap/tidb-tools v4.0.1-0.20200530144555-cdec43635625+incompatible - github.com/pingcap/tipb v0.0.0-20200522051215-f31a15d98fce -======= - github.com/pingcap/parser v0.0.0-20200623082809-b74301ac298b - github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200714122454-1a64f969cb3c - github.com/pingcap/sysutil v0.0.0-20200715082929-4c47bcac246a - github.com/pingcap/tidb-tools v4.0.1+incompatible github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3 ->>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.9.1 diff --git a/go.sum b/go.sum index 61ee701293002..41877c353bc15 100644 --- a/go.sum +++ b/go.sum @@ -419,6 +419,8 @@ github.com/pingcap/parser v0.0.0-20200507022230-f3bf29096657/go.mod h1:9v0Edh8Ib github.com/pingcap/parser v0.0.0-20200603032439-c4ecb4508d2f/go.mod h1:9v0Edh8IbgjGYW2ArJr19E+bvL8zKahsFp+ixWeId+4= github.com/pingcap/parser v0.0.0-20200623164729-3a18f1e5dceb h1:v9iX5qIr8nG3QxMtlcTT+1DI0YD4HqABy7tuohbp28E= github.com/pingcap/parser v0.0.0-20200623164729-3a18f1e5dceb/go.mod h1:vQdbJqobJAgFyiRNNtXahpMoGWwPEuWciVEK5A20NS0= +github.com/pingcap/parser v0.0.0-20200729094414-b5a528cdf2fc h1:7B+V5WJSLi0PTZ9hCHs0YGSRXoUhEj4TO1hZg1aEldQ= +github.com/pingcap/parser v0.0.0-20200729094414-b5a528cdf2fc/go.mod h1:vQdbJqobJAgFyiRNNtXahpMoGWwPEuWciVEK5A20NS0= github.com/pingcap/pd/v4 v4.0.0-rc.1.0.20200422143320-428acd53eba2/go.mod h1:s+utZtXDznOiL24VK0qGmtoHjjXNsscJx3m1n8cC56s= github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200520083007-2c251bd8f181 h1:FM+PzdoR3fmWAJx3ug+p5aOgs5aZYwFkoDL7Potdsz0= github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200520083007-2c251bd8f181/go.mod h1:q4HTx/bA8aKBa4S7L+SQKHvjRPXCRV0tA0yRw0qkZSA= @@ -439,14 +441,9 @@ github.com/pingcap/tidb-tools v4.0.1-0.20200530144555-cdec43635625+incompatible github.com/pingcap/tidb-tools v4.0.1-0.20200530144555-cdec43635625+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= github.com/pingcap/tipb v0.0.0-20200417094153-7316d94df1ee/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= -<<<<<<< HEAD -github.com/pingcap/tipb v0.0.0-20200522051215-f31a15d98fce h1:LDyY6Xh/Z/SHVQ10erWtoOwIxHSTtlpPQ9cvS+BfRMY= github.com/pingcap/tipb v0.0.0-20200522051215-f31a15d98fce/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= -======= -github.com/pingcap/tipb v0.0.0-20200604070248-508f03b0b342/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3 h1:ESL3eIt1kUt8IMvR1011ejZlAyDcOzw89ARvVHvpD5k= github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= ->>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index 23ae8ac04601c..11947a354fee9 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -160,9 +160,6 @@ func (p *baseLogicalPlan) enumeratePhysicalPlans4Task(physicalPlans []PhysicalPl // combine best child tasks with parent physical plan. curTask := pp.attach2Task(childTasks...) -<<<<<<< HEAD - // enforce curTask property -======= if prop.IsFlashOnlyProp() { if _, ok := curTask.(*copTask); !ok { continue @@ -170,7 +167,6 @@ func (p *baseLogicalPlan) enumeratePhysicalPlans4Task(physicalPlans []PhysicalPl } // Enforce curTask property ->>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) if prop.Enforced { curTask = enforceProperty(prop, curTask, p.basePlan.ctx) } @@ -416,11 +412,7 @@ func (ds *DataSource) skylinePruning(prop *property.PhysicalProperty) []*candida continue } var currentCandidate *candidatePath -<<<<<<< HEAD if path.IsTablePath { - currentCandidate = ds.getTableCandidate(path, prop) -======= - if path.IsTablePath() { if path.StoreType == kv.TiFlash { if path.IsTiFlashGlobalRead && prop.TaskTp == property.CopTiFlashGlobalReadTaskType { currentCandidate = ds.getTableCandidate(path, prop) @@ -436,7 +428,6 @@ func (ds *DataSource) skylinePruning(prop *property.PhysicalProperty) []*candida if currentCandidate == nil { continue } ->>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) } else { coveredByIdx := isCoveringIndex(ds.schema.Columns, path.FullIdxCols, path.FullIdxColLens, ds.tableInfo.PKIsHandle) if len(path.AccessConds) > 0 || !prop.IsEmpty() || path.Forced || coveredByIdx { diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index ea125bb910312..5c6f5093b3aaa 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -2306,15 +2306,11 @@ func (b *PlanBuilder) pushTableHints(hints []*ast.TableOptimizerHint, nodeType u switch hint.HintName.L { case TiDBMergeJoin, HintSMJ: -<<<<<<< HEAD sortMergeTables = append(sortMergeTables, tableNames2HintTableInfo(b.ctx, hint.Tables, b.hintProcessor, nodeType, currentLevel)...) -======= - sortMergeTables = append(sortMergeTables, tableNames2HintTableInfo(b.ctx, hint.HintName.L, hint.Tables, b.hintProcessor, nodeType, currentLevel)...) case TiDBBroadCastJoin, HintBCJ: - BCTables = append(BCTables, tableNames2HintTableInfo(b.ctx, hint.HintName.L, hint.Tables, b.hintProcessor, nodeType, currentLevel)...) + BCTables = append(BCTables, tableNames2HintTableInfo(b.ctx, hint.Tables, b.hintProcessor, nodeType, currentLevel)...) case HintBCJPreferLocal: - BCJPreferLocalTables = append(BCJPreferLocalTables, tableNames2HintTableInfo(b.ctx, hint.HintName.L, hint.Tables, b.hintProcessor, nodeType, currentLevel)...) ->>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) + BCJPreferLocalTables = append(BCJPreferLocalTables, tableNames2HintTableInfo(b.ctx, hint.Tables, b.hintProcessor, nodeType, currentLevel)...) case TiDBIndexNestedLoopJoin, HintINLJ: INLJTables = append(INLJTables, tableNames2HintTableInfo(b.ctx, hint.Tables, b.hintProcessor, nodeType, currentLevel)...) case HintINLHJ: diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 357bda244d9eb..af2f4e55aac96 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -439,30 +439,12 @@ type PhysicalMergeJoin struct { Desc bool } -<<<<<<< HEAD -======= // PhysicalBroadCastJoin only works for TiFlash Engine, which broadcast the small table to every replica of probe side of tables. type PhysicalBroadCastJoin struct { basePhysicalJoin globalChildIndex int } -// Clone implements PhysicalPlan interface. -func (p *PhysicalMergeJoin) Clone() (PhysicalPlan, error) { - cloned := new(PhysicalMergeJoin) - base, err := p.basePhysicalJoin.cloneWithSelf(cloned) - if err != nil { - return nil, err - } - cloned.basePhysicalJoin = *base - for _, cf := range p.CompareFuncs { - cloned.CompareFuncs = append(cloned.CompareFuncs, cf) - } - cloned.Desc = p.Desc - return cloned, nil -} - ->>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) // PhysicalLock is the physical operator of lock, which is used for `select ... for update` clause. type PhysicalLock struct { basePhysicalPlan diff --git a/planner/core/plan_to_pb.go b/planner/core/plan_to_pb.go index b9dba1e4e79b4..a81109b91c20d 100644 --- a/planner/core/plan_to_pb.go +++ b/planner/core/plan_to_pb.go @@ -149,18 +149,12 @@ func (p *PhysicalLimit) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*t } // ToPB implements PhysicalPlan ToPB interface. -<<<<<<< HEAD -func (p *PhysicalTableScan) ToPB(ctx sessionctx.Context) (*tipb.Executor, error) { +func (p *PhysicalTableScan) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error) { tsExec := &tipb.TableScan{ TableId: p.Table.ID, Columns: util.ColumnsToProto(p.Columns, p.Table.PKIsHandle), Desc: p.Desc, } -======= -func (p *PhysicalTableScan) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error) { - tsExec := tables.BuildTableScanFromInfos(p.Table, p.Columns) - tsExec.Desc = p.Desc ->>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) if p.isPartition { tsExec.TableId = p.physicalTableID } diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 2271520415b08..d0964aed99e8f 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -735,31 +735,12 @@ func isPrimaryIndex(indexName model.CIStr) bool { return indexName.L == "primary" } -<<<<<<< HEAD -func (b *PlanBuilder) getPossibleAccessPaths(indexHints []*ast.IndexHint, tbl table.Table, dbName, tblName model.CIStr) ([]*util.AccessPath, error) { -======= func genTiFlashPath(tblInfo *model.TableInfo, isGlobalRead bool) *util.AccessPath { - tiFlashPath := &util.AccessPath{StoreType: kv.TiFlash, IsTiFlashGlobalRead: isGlobalRead} - fillContentForTablePath(tiFlashPath, tblInfo) + tiFlashPath := &util.AccessPath{IsTablePath: true, StoreType: kv.TiFlash, IsTiFlashGlobalRead: isGlobalRead} return tiFlashPath } -func fillContentForTablePath(tablePath *util.AccessPath, tblInfo *model.TableInfo) { - if tblInfo.IsCommonHandle { - tablePath.IsCommonHandlePath = true - for _, index := range tblInfo.Indices { - if index.Primary { - tablePath.Index = index - break - } - } - } else { - tablePath.IsIntHandlePath = true - } -} - -func getPossibleAccessPaths(ctx sessionctx.Context, tableHints *tableHintInfo, indexHints []*ast.IndexHint, tbl table.Table, dbName, tblName model.CIStr) ([]*util.AccessPath, error) { ->>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) +func (b *PlanBuilder) getPossibleAccessPaths(indexHints []*ast.IndexHint, tbl table.Table, dbName, tblName model.CIStr) ([]*util.AccessPath, error) { tblInfo := tbl.Meta() publicPaths := make([]*util.AccessPath, 0, len(tblInfo.Indices)+2) tp := kv.TiKV @@ -768,12 +749,8 @@ func getPossibleAccessPaths(ctx sessionctx.Context, tableHints *tableHintInfo, i } publicPaths = append(publicPaths, &util.AccessPath{IsTablePath: true, StoreType: tp}) if tblInfo.TiFlashReplica != nil && tblInfo.TiFlashReplica.Available { -<<<<<<< HEAD - publicPaths = append(publicPaths, &util.AccessPath{IsTablePath: true, StoreType: kv.TiFlash}) -======= publicPaths = append(publicPaths, genTiFlashPath(tblInfo, false)) publicPaths = append(publicPaths, genTiFlashPath(tblInfo, true)) ->>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) } for _, index := range tblInfo.Indices { if index.State == model.StatePublic { diff --git a/planner/util/path.go b/planner/util/path.go index 8c1ff581b0479..e95f17b3adf7c 100644 --- a/planner/util/path.go +++ b/planner/util/path.go @@ -50,17 +50,11 @@ type AccessPath struct { IsDNFCond bool -<<<<<<< HEAD // IsTablePath indicates whether this path is table path. IsTablePath bool -======= // IsTiFlashGlobalRead indicates whether this path is a remote read path for tiflash IsTiFlashGlobalRead bool - // IsIntHandlePath indicates whether this path is table path. - IsIntHandlePath bool - IsCommonHandlePath bool ->>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) // Forced means this path is generated by `use/force index()`. Forced bool diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index 639fd41ba9742..e9214009fa7af 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -435,18 +435,13 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string, scope Sc return "ON", nil } return value, ErrWrongValueForVar.GenWithStackByArgs(name, value) -<<<<<<< HEAD - case TiDBSkipUTF8Check, TiDBOptAggPushDown, TiDBOptDistinctAggPushDown, - TiDBOptInSubqToJoinAndAgg, TiDBEnableFastAnalyze, -======= case TiDBOptBCJ: if (strings.EqualFold(value, "ON") || value == "1") && vars.AllowBatchCop == 0 { return value, ErrWrongValueForVar.GenWithStackByArgs("Can't set Broadcast Join to 1 but tidb_allow_batch_cop is 0, please active batch cop at first.") } return value, nil - case TiDBSkipUTF8Check, TiDBSkipASCIICheck, TiDBOptAggPushDown, - TiDBOptDistinctAggPushDown, TiDBOptInSubqToJoinAndAgg, TiDBEnableFastAnalyze, ->>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) + case TiDBSkipUTF8Check, TiDBOptAggPushDown, TiDBOptDistinctAggPushDown, + TiDBOptInSubqToJoinAndAgg, TiDBEnableFastAnalyze, TiDBBatchInsert, TiDBDisableTxnAutoRetry, TiDBEnableStreaming, TiDBEnableChunkRPC, TiDBBatchDelete, TiDBBatchCommit, TiDBEnableCascadesPlanner, TiDBEnableWindowFunction, TiDBPProfSQLCPU, TiDBLowResolutionTSO, TiDBEnableIndexMerge, TiDBEnableNoopFuncs, diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index 29ab3f190e34b..2444b288bd83a 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -65,26 +65,12 @@ func (s *testVarsutilSuite) TestNewSessionVars(c *C) { c.Assert(vars.IndexLookupJoinConcurrency, Equals, DefIndexLookupJoinConcurrency) c.Assert(vars.HashJoinConcurrency, Equals, DefTiDBHashJoinConcurrency) c.Assert(vars.AllowBatchCop, Equals, DefTiDBAllowBatchCop) -<<<<<<< HEAD + c.Assert(vars.AllowBCJ, Equals, DefOptBCJ) c.Assert(vars.ProjectionConcurrency, Equals, int64(DefTiDBProjectionConcurrency)) c.Assert(vars.HashAggPartialConcurrency, Equals, DefTiDBHashAggPartialConcurrency) c.Assert(vars.HashAggFinalConcurrency, Equals, DefTiDBHashAggFinalConcurrency) c.Assert(vars.WindowConcurrency, Equals, DefTiDBWindowConcurrency) c.Assert(vars.DistSQLScanConcurrency, Equals, DefDistSQLScanConcurrency) -======= - c.Assert(vars.AllowBCJ, Equals, DefOptBCJ) - c.Assert(vars.projectionConcurrency, Equals, ConcurrencyUnset) - c.Assert(vars.hashAggPartialConcurrency, Equals, ConcurrencyUnset) - c.Assert(vars.hashAggFinalConcurrency, Equals, ConcurrencyUnset) - c.Assert(vars.windowConcurrency, Equals, ConcurrencyUnset) - c.Assert(vars.distSQLScanConcurrency, Equals, DefDistSQLScanConcurrency) - c.Assert(vars.ProjectionConcurrency(), Equals, DefExecutorConcurrency) - c.Assert(vars.HashAggPartialConcurrency(), Equals, DefExecutorConcurrency) - c.Assert(vars.HashAggFinalConcurrency(), Equals, DefExecutorConcurrency) - c.Assert(vars.WindowConcurrency(), Equals, DefExecutorConcurrency) - c.Assert(vars.DistSQLScanConcurrency(), Equals, DefDistSQLScanConcurrency) - c.Assert(vars.ExecutorConcurrency, Equals, DefExecutorConcurrency) ->>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) c.Assert(vars.MaxChunkSize, Equals, DefMaxChunkSize) c.Assert(vars.DMLBatchSize, Equals, DefDMLBatchSize) c.Assert(vars.MemQuotaQuery, Equals, config.GetGlobalConfig().MemQuotaQuery) diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index 7a16364c6834c..324e838d6b574 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -663,47 +663,6 @@ func (h *rpcHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb return resp } -<<<<<<< HEAD -======= -func drainRowsFromExecutor(ctx context.Context, e executor, req *tipb.DAGRequest) (tipb.Chunk, error) { - var chunk tipb.Chunk - for { - row, err := e.Next(ctx) - if err != nil { - return chunk, errors.Trace(err) - } - if row == nil { - return chunk, nil - } - for _, offset := range req.OutputOffsets { - chunk.RowsData = append(chunk.RowsData, row[offset]...) - } - } -} - -func (h *rpcHandler) handleBatchCopRequest(ctx context.Context, req *coprocessor.BatchRequest) (*mockBatchCopDataClient, error) { - client := &mockBatchCopDataClient{} - for _, ri := range req.Regions { - cop := coprocessor.Request{ - Tp: kv.ReqTypeDAG, - Data: req.Data, - StartTs: req.StartTs, - Ranges: ri.Ranges, - } - _, exec, dagReq, err := h.buildDAGExecutor(&cop, true) - if err != nil { - return nil, errors.Trace(err) - } - chunk, err := drainRowsFromExecutor(ctx, exec, dagReq) - if err != nil { - return nil, errors.Trace(err) - } - client.chunks = append(client.chunks, chunk) - } - return client, nil -} - ->>>>>>> 29178df... planner, executor: support broadcast join for tiflash engine. (#17232) // Client is a client that sends RPC. // This is same with tikv.Client, define again for avoid circle import. type Client interface { From 5ae37cea590032d5151eba20193b59b0ee0ee468 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Tue, 4 Aug 2020 12:13:58 +0800 Subject: [PATCH 3/4] change go mod --- go.mod | 2 -- 1 file changed, 2 deletions(-) diff --git a/go.mod b/go.mod index a7abfcdffee1f..a15a93917163c 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,5 @@ module github.com/pingcap/tidb -replace github.com/pingcap/parser v0.0.0-20200623164729-3a18f1e5dceb => github.com/pingcap/parser v0.0.0-20200729094414-b5a528cdf2fc - require ( github.com/BurntSushi/toml v0.3.1 github.com/Jeffail/gabs/v2 v2.5.1 From 264ebe2d2571b203cea09bd8f85bd132f9010c15 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Fri, 14 Aug 2020 14:54:48 +0800 Subject: [PATCH 4/4] executor: fix bug of tiflash executing apply (#19182) Co-authored-by: ti-srebot <66930949+ti-srebot@users.noreply.github.com> --- executor/builder.go | 1 + executor/table_reader.go | 15 ++++++++++++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 872e819834a2e..05811fd990947 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -2302,6 +2302,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea corColInFilter: b.corColInDistPlan(v.TablePlans), corColInAccess: b.corColInAccess(v.TablePlans[0]), plans: v.TablePlans, + tablePlan: v.GetTablePlan(), storeType: v.StoreType, } e.setBatchCop(v) diff --git a/executor/table_reader.go b/executor/table_reader.go index 02890fea5c510..dfce0292196da 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -71,6 +71,7 @@ type TableReaderExecutor struct { resultHandler *tableResultHandler feedback *statistics.QueryFeedback plans []plannercore.PhysicalPlan + tablePlan plannercore.PhysicalPlan memTracker *memory.Tracker selectResultHook // for testing @@ -105,9 +106,17 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error { var err error if e.corColInFilter { - e.dagPB.Executors, _, err = constructDistExec(e.ctx, e.plans) - if err != nil { - return err + if e.storeType == kv.TiFlash { + execs, _, err := constructDistExecForTiFlash(e.ctx, e.tablePlan) + if err != nil { + return err + } + e.dagPB.RootExecutor = execs[0] + } else { + e.dagPB.Executors, _, err = constructDistExec(e.ctx, e.plans) + if err != nil { + return err + } } } if e.runtimeStats != nil {