Skip to content

Commit

Permalink
planner, executor: support broadcast join for tiflash engine. (#17232) (
Browse files Browse the repository at this point in the history
#18801)

Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
ti-srebot committed Sep 1, 2020
1 parent bfdc09f commit 8b3ff32
Show file tree
Hide file tree
Showing 36 changed files with 851 additions and 97 deletions.
37 changes: 27 additions & 10 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1980,7 +1980,7 @@ func constructDistExec(sctx sessionctx.Context, plans []plannercore.PhysicalPlan
streaming := true
executors := make([]*tipb.Executor, 0, len(plans))
for _, p := range plans {
execPB, err := p.ToPB(sctx)
execPB, err := p.ToPB(sctx, kv.TiKV)
if err != nil {
return nil, false, err
}
Expand All @@ -2002,7 +2002,13 @@ func markChildrenUsedCols(outputSchema *expression.Schema, childSchema ...*expre
return
}

func (b *executorBuilder) constructDAGReq(plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, err error) {
func constructDistExecForTiFlash(sctx sessionctx.Context, p plannercore.PhysicalPlan) ([]*tipb.Executor, bool, error) {
execPB, err := p.ToPB(sctx, kv.TiFlash)
return []*tipb.Executor{execPB}, false, err

}

func (b *executorBuilder) constructDAGReq(plans []plannercore.PhysicalPlan, storeType kv.StoreType) (dagReq *tipb.DAGRequest, streaming bool, err error) {
dagReq = &tipb.DAGRequest{}
dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(b.ctx.GetSessionVars().Location())
sc := b.ctx.GetSessionVars().StmtCtx
Expand All @@ -2011,7 +2017,13 @@ func (b *executorBuilder) constructDAGReq(plans []plannercore.PhysicalPlan) (dag
dagReq.CollectExecutionSummaries = &collExec
}
dagReq.Flags = sc.PushDownFlags()
dagReq.Executors, streaming, err = constructDistExec(b.ctx, plans)
if storeType == kv.TiFlash {
var executors []*tipb.Executor
executors, streaming, err = constructDistExecForTiFlash(b.ctx, plans[0])
dagReq.RootExecutor = executors[0]
} else {
dagReq.Executors, streaming, err = constructDistExec(b.ctx, plans)
}

distsql.SetEncodeType(b.ctx, dagReq)
return dagReq, streaming, err
Expand Down Expand Up @@ -2241,7 +2253,7 @@ func (e *TableReaderExecutor) setBatchCop(v *plannercore.PhysicalTableReader) {
case 1:
for _, p := range v.TablePlans {
switch p.(type) {
case *plannercore.PhysicalHashAgg, *plannercore.PhysicalStreamAgg, *plannercore.PhysicalTopN:
case *plannercore.PhysicalHashAgg, *plannercore.PhysicalStreamAgg, *plannercore.PhysicalTopN, *plannercore.PhysicalBroadCastJoin:
e.batchCop = true
}
}
Expand All @@ -2252,11 +2264,15 @@ func (e *TableReaderExecutor) setBatchCop(v *plannercore.PhysicalTableReader) {
}

func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableReader) (*TableReaderExecutor, error) {
dagReq, streaming, err := b.constructDAGReq(v.TablePlans)
tablePlans := v.TablePlans
if v.StoreType == kv.TiFlash {
tablePlans = []plannercore.PhysicalPlan{v.GetTablePlan()}
}
dagReq, streaming, err := b.constructDAGReq(tablePlans, v.StoreType)
if err != nil {
return nil, err
}
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
ts := v.GetTableScan()
tbl, _ := b.is.TableByID(ts.Table.ID)
isPartition, physicalTableID := ts.IsPartition()
if isPartition {
Expand All @@ -2279,6 +2295,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
corColInFilter: b.corColInDistPlan(v.TablePlans),
corColInAccess: b.corColInAccess(v.TablePlans[0]),
plans: v.TablePlans,
tablePlan: v.GetTablePlan(),
storeType: v.StoreType,
}
e.setBatchCop(v)
Expand Down Expand Up @@ -2323,15 +2340,15 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) *
return nil
}

ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
ts := v.GetTableScan()
ret.ranges = ts.Ranges
sctx := b.ctx.GetSessionVars().StmtCtx
sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID)
return ret
}

func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexReader) (*IndexReaderExecutor, error) {
dagReq, streaming, err := b.constructDAGReq(v.IndexPlans)
dagReq, streaming, err := b.constructDAGReq(v.IndexPlans, kv.TiKV)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2405,7 +2422,7 @@ func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) *
}

func buildTableReq(b *executorBuilder, schemaLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, val table.Table, err error) {
tableReq, tableStreaming, err := b.constructDAGReq(plans)
tableReq, tableStreaming, err := b.constructDAGReq(plans, kv.TiKV)
if err != nil {
return nil, false, nil, err
}
Expand All @@ -2423,7 +2440,7 @@ func buildTableReq(b *executorBuilder, schemaLen int, plans []plannercore.Physic
}

func buildIndexReq(b *executorBuilder, schemaLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, err error) {
indexReq, indexStreaming, err := b.constructDAGReq(plans)
indexReq, indexStreaming, err := b.constructDAGReq(plans, kv.TiKV)
if err != nil {
return nil, false, err
}
Expand Down
15 changes: 12 additions & 3 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type TableReaderExecutor struct {
resultHandler *tableResultHandler
feedback *statistics.QueryFeedback
plans []plannercore.PhysicalPlan
tablePlan plannercore.PhysicalPlan

memTracker *memory.Tracker
selectResultHook // for testing
Expand Down Expand Up @@ -104,9 +105,17 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error {

var err error
if e.corColInFilter {
e.dagPB.Executors, _, err = constructDistExec(e.ctx, e.plans)
if err != nil {
return err
if e.storeType == kv.TiFlash {
execs, _, err := constructDistExecForTiFlash(e.ctx, e.tablePlan)
if err != nil {
return err
}
e.dagPB.RootExecutor = execs[0]
} else {
e.dagPB.Executors, _, err = constructDistExec(e.ctx, e.plans)
if err != nil {
return err
}
}
}
if e.runtimeStats != nil {
Expand Down
2 changes: 1 addition & 1 deletion executor/table_readers_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func buildMockDAGRequest(sctx sessionctx.Context) *tipb.DAGRequest {
Columns: []*model.ColumnInfo{},
Table: &model.TableInfo{ID: 12345, PKIsHandle: false},
Desc: false,
}})
}}, kv.TiKV)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ require (
github.com/pingcap/pd/v4 v4.0.5-0.20200817114353-e465cafe8a91
github.com/pingcap/sysutil v0.0.0-20200715082929-4c47bcac246a
github.com/pingcap/tidb-tools v4.0.1-0.20200530144555-cdec43635625+incompatible
github.com/pingcap/tipb v0.0.0-20200522051215-f31a15d98fce
github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.9.1
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,9 @@ github.com/pingcap/tidb-tools v4.0.1-0.20200530144555-cdec43635625+incompatible
github.com/pingcap/tidb-tools v4.0.1-0.20200530144555-cdec43635625+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20200417094153-7316d94df1ee/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20200522051215-f31a15d98fce h1:LDyY6Xh/Z/SHVQ10erWtoOwIxHSTtlpPQ9cvS+BfRMY=
github.com/pingcap/tipb v0.0.0-20200522051215-f31a15d98fce/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3 h1:ESL3eIt1kUt8IMvR1011ejZlAyDcOzw89ARvVHvpD5k=
github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
2 changes: 2 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,8 @@ func (e *Explain) explainPlanInRowFormat(p Plan, taskType, driverSide, indent st
buildSide = plan.InnerChildIdx ^ 1
case *PhysicalIndexHashJoin:
buildSide = plan.InnerChildIdx ^ 1
case *PhysicalBroadCastJoin:
buildSide = plan.InnerChildIdx
}

if buildSide != -1 {
Expand Down
Loading

0 comments on commit 8b3ff32

Please sign in to comment.