Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner, executor: support broadcast join for tiflash engine. #17232

Merged
merged 95 commits into from
Jul 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
95 commits
Select commit Hold shift + click to select a range
9122e31
support batch cop for tiflash
hanfei1991 Mar 7, 2020
8456ef3
Merge branch 'master' into hanfei/batch_cop
hanfei1991 Mar 9, 2020
460d5a8
support batch cop
hanfei1991 Mar 10, 2020
7a147b6
support join push down to tiflash
hanfei1991 Mar 3, 2020
943863b
refine
hanfei1991 Mar 4, 2020
a654fda
change pb
hanfei1991 Mar 4, 2020
2d7e3c0
push join
hanfei1991 Mar 4, 2020
7b6ad95
fix
hanfei1991 Mar 5, 2020
4aae81b
add hint
hanfei1991 Mar 7, 2020
7c49728
refine hint
hanfei1991 Mar 7, 2020
a0a88f3
add ranges
hanfei1991 Mar 8, 2020
e90731b
fix
hanfei1991 Mar 9, 2020
239cc17
fix
hanfei1991 Mar 10, 2020
00d417a
fix push down
hanfei1991 Mar 10, 2020
deb6fd2
fix index
hanfei1991 Mar 11, 2020
55477c7
enable distsql for join
hanfei1991 Mar 12, 2020
c01e9a5
Merge branch 'master' into hanfei/join-merge
hanfei1991 Mar 12, 2020
1dbbb56
add a session var to disable/enable broadcast join
windtalker Mar 10, 2020
c4b2e13
fix bug
windtalker Mar 13, 2020
49ff997
Merge branch 'master' of https://github.com/pingcap/tidb into hanfei/…
windtalker Mar 13, 2020
35e66a1
Merge branch 'master' into hanfei/join-merge
hanfei1991 Mar 15, 2020
2c92a29
fix bug
windtalker Mar 16, 2020
05ade28
tiny fix
hanfei1991 Mar 16, 2020
4f14acf
enable cast decimal pushdown to tiflash
windtalker Mar 16, 2020
ea9a0f1
Merge branch 'master' of https://github.com/pingcap/tidb into hanfei/…
windtalker Mar 17, 2020
cd7f225
fix
hanfei1991 Mar 17, 2020
bf8bf6e
merge master
windtalker Mar 30, 2020
04b4cf6
fix bc join bug
windtalker Mar 30, 2020
3d1c0ed
Merge branch 'hanfei/join-merge' of https://github.com/hanfei1991/tid…
windtalker Mar 30, 2020
40e3990
make broadcast plan stable
windtalker Mar 31, 2020
617fde6
refine code
windtalker Mar 31, 2020
d3ce0bd
fix bug
windtalker Mar 31, 2020
bb22eb6
Merge pull request #2 from windtalker/hanfei/join-merge
hanfei1991 Mar 31, 2020
2580fca
basic support for multi table broadcast join
windtalker Apr 2, 2020
8a51e02
fix bug
windtalker Apr 3, 2020
d2ee6e9
Merge pull request #3 from windtalker/hanfei/join-merge
hanfei1991 Apr 6, 2020
0d08302
basic cbo for broadcast join
windtalker Apr 8, 2020
1b165c5
improve
windtalker Apr 8, 2020
ccfb91c
fix bug
windtalker Apr 8, 2020
a5277b1
Merge branch 'master' into hanfei/join-merge
hanfei1991 Apr 9, 2020
9724824
remote useless code
hanfei1991 Apr 9, 2020
d931b18
add tests
windtalker Apr 13, 2020
f1f5163
merge join-merge branch
windtalker Apr 13, 2020
7774284
Merge pull request #4 from windtalker/hanfei/join-merge
windtalker Apr 13, 2020
7a2eede
pass unit tests
hanfei1991 Apr 20, 2020
ed3065d
refine code
hanfei1991 Apr 21, 2020
e04f61d
Merge pull request #5 from hanfei1991/join-merge-pass-test
windtalker Apr 21, 2020
8a7fc25
support execute summary info for broadcast join
windtalker Apr 22, 2020
87d9fdf
fix bug in explain for broadcast join
windtalker Apr 22, 2020
23bf158
format code
windtalker Apr 22, 2020
86cc61a
remove un-needed code
windtalker Apr 22, 2020
7b43599
fix make dev
hanfei1991 Apr 22, 2020
d85145b
address comments
windtalker Apr 23, 2020
cff4a84
Merge pull request #6 from windtalker/hanfei/join-merge
hanfei1991 Apr 23, 2020
a6b79ca
Hanfei/join merge (#7)
windtalker May 6, 2020
f13a6cd
fix test
hanfei1991 May 6, 2020
39b046c
change tidb_opt_broadcast_join to global vars
hanfei1991 May 7, 2020
5df59c9
Merge pull request #9 from hanfei1991/bcj-global
windtalker May 8, 2020
a28e1ee
Ban cartesian join to be pushed down to TiFlash (#8)
ichn-hu May 8, 2020
e2659a1
merge master (#10)
windtalker May 9, 2020
69fd90d
check session var conflict
hanfei1991 May 9, 2020
2d1f0d2
Merge pull request #11 from hanfei1991/hanfei/join-merge-session-vars
windtalker May 9, 2020
a6d5cbb
Merge branch 'master' into hanfei/join-merge
hanfei1991 May 9, 2020
694fa00
Add perfer local hint for broadcast join (#12)
ichn-hu May 11, 2020
00f2da1
refine planner
hanfei1991 May 13, 2020
1ce94ee
refine comments
hanfei1991 May 13, 2020
a469308
Merge pull request #13 from hanfei1991/hanfei/join-merge-plan
windtalker May 13, 2020
df63b06
Merge branch 'master' into hanfei/join-merge
hanfei1991 May 14, 2020
63f4b8b
Merge branch 'hanfei/join-merge' of github.com:hanfei1991/tidb into h…
hanfei1991 May 14, 2020
94ece03
fix make dev
hanfei1991 May 17, 2020
e602a4d
Merge branch 'master' into hanfei/join-merge
hanfei1991 May 18, 2020
80db9b4
Merge branch 'master' into hanfei/join-merge
hanfei1991 Jun 15, 2020
691acc5
fix make dev
hanfei1991 Jun 15, 2020
a750bdc
update parser
hanfei1991 Jun 17, 2020
0a6b1ca
address comments
hanfei1991 Jun 17, 2020
635ce4e
fix make dev
hanfei1991 Jun 18, 2020
4eada0d
Merge branch 'master' into hanfei/join-merge
hanfei1991 Jun 19, 2020
4af80d5
Merge branch 'master' into hanfei/join-merge
hanfei1991 Jun 19, 2020
b0748f9
disable broadcast join when new collation is enabled
windtalker Jun 23, 2020
23fc80c
Merge branch 'master' into hanfei/join-merge
hanfei1991 Jun 23, 2020
89f3eab
Merge branch 'hanfei/join-merge' of github.com:hanfei1991/tidb into h…
hanfei1991 Jun 23, 2020
82ce641
Update planner/core/exhaust_physical_plans.go
hanfei1991 Jun 23, 2020
b760e78
address comments
hanfei1991 Jun 24, 2020
f69bb5b
fix
hanfei1991 Jun 24, 2020
ab57fa7
Merge branch 'master' into hanfei/join-merge
hanfei1991 Jun 24, 2020
4ea8cfe
Merge branch 'master' into hanfei/join-merge
hanfei1991 Jul 3, 2020
16531a0
Merge branch 'master' into hanfei/join-merge
hanfei1991 Jul 6, 2020
499f7b8
address comments
hanfei1991 Jul 7, 2020
e5e8cdd
Merge branch 'master' into hanfei/join-merge
hanfei1991 Jul 13, 2020
323355d
Merge branch 'master' into hanfei/join-merge
hanfei1991 Jul 15, 2020
adc2d04
fix tests
hanfei1991 Jul 17, 2020
c34034a
Merge branch 'master' into hanfei/join-merge
hanfei1991 Jul 17, 2020
83a1761
Merge branch 'master' into hanfei/join-merge
hanfei1991 Jul 20, 2020
84a3539
address comments
hanfei1991 Jul 21, 2020
e871df4
Merge branch 'master' into hanfei/join-merge
hanfei1991 Jul 27, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,14 @@ func (r *selectResult) updateCopRuntimeStats(detail *execdetails.ExecDetails, re
for i, detail := range r.selectResp.GetExecutionSummaries() {
if detail != nil && detail.TimeProcessedNs != nil &&
detail.NumProducedRows != nil && detail.NumIterations != nil {
planID := r.copPlanIDs[i]
planID := ""
if detail.GetExecutorId() != "" {
planID = detail.GetExecutorId()
} else {
planID = r.copPlanIDs[i].String()
}
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.
RecordOneCopTask(planID.String(), callee, detail)
RecordOneCopTask(planID, callee, detail)
}
}
}
Expand Down
36 changes: 26 additions & 10 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2055,7 +2055,7 @@ func constructDistExec(sctx sessionctx.Context, plans []plannercore.PhysicalPlan
streaming := true
executors := make([]*tipb.Executor, 0, len(plans))
for _, p := range plans {
execPB, err := p.ToPB(sctx)
execPB, err := p.ToPB(sctx, kv.TiKV)
if err != nil {
return nil, false, err
}
Expand All @@ -2077,7 +2077,13 @@ func markChildrenUsedCols(outputSchema *expression.Schema, childSchema ...*expre
return
}

func (b *executorBuilder) constructDAGReq(plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, err error) {
func constructDistExecForTiFlash(sctx sessionctx.Context, p plannercore.PhysicalPlan) ([]*tipb.Executor, bool, error) {
execPB, err := p.ToPB(sctx, kv.TiFlash)
return []*tipb.Executor{execPB}, false, err

}

func (b *executorBuilder) constructDAGReq(plans []plannercore.PhysicalPlan, storeType kv.StoreType) (dagReq *tipb.DAGRequest, streaming bool, err error) {
dagReq = &tipb.DAGRequest{}
dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(b.ctx.GetSessionVars().Location())
sc := b.ctx.GetSessionVars().StmtCtx
Expand All @@ -2086,7 +2092,13 @@ func (b *executorBuilder) constructDAGReq(plans []plannercore.PhysicalPlan) (dag
dagReq.CollectExecutionSummaries = &collExec
}
dagReq.Flags = sc.PushDownFlags()
dagReq.Executors, streaming, err = constructDistExec(b.ctx, plans)
if storeType == kv.TiFlash {
var executors []*tipb.Executor
executors, streaming, err = constructDistExecForTiFlash(b.ctx, plans[0])
dagReq.RootExecutor = executors[0]
} else {
dagReq.Executors, streaming, err = constructDistExec(b.ctx, plans)
}

distsql.SetEncodeType(b.ctx, dagReq)
return dagReq, streaming, err
Expand Down Expand Up @@ -2316,7 +2328,7 @@ func (e *TableReaderExecutor) setBatchCop(v *plannercore.PhysicalTableReader) {
case 1:
for _, p := range v.TablePlans {
switch p.(type) {
case *plannercore.PhysicalHashAgg, *plannercore.PhysicalStreamAgg, *plannercore.PhysicalTopN:
case *plannercore.PhysicalHashAgg, *plannercore.PhysicalStreamAgg, *plannercore.PhysicalTopN, *plannercore.PhysicalBroadCastJoin:
e.batchCop = true
}
}
Expand All @@ -2327,11 +2339,15 @@ func (e *TableReaderExecutor) setBatchCop(v *plannercore.PhysicalTableReader) {
}

func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableReader) (*TableReaderExecutor, error) {
dagReq, streaming, err := b.constructDAGReq(v.TablePlans)
tablePlans := v.TablePlans
if v.StoreType == kv.TiFlash {
tablePlans = []plannercore.PhysicalPlan{v.GetTablePlan()}
}
dagReq, streaming, err := b.constructDAGReq(tablePlans, v.StoreType)
if err != nil {
return nil, err
}
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
ts := v.GetTableScan()
tbl, _ := b.is.TableByID(ts.Table.ID)
isPartition, physicalTableID := ts.IsPartition()
if isPartition {
Expand Down Expand Up @@ -2398,15 +2414,15 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) *
return nil
}

ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
ts := v.GetTableScan()
ret.ranges = ts.Ranges
sctx := b.ctx.GetSessionVars().StmtCtx
sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID)
return ret
}

func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexReader) (*IndexReaderExecutor, error) {
dagReq, streaming, err := b.constructDAGReq(v.IndexPlans)
dagReq, streaming, err := b.constructDAGReq(v.IndexPlans, kv.TiKV)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2480,7 +2496,7 @@ func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) *
}

func buildTableReq(b *executorBuilder, schemaLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, val table.Table, err error) {
tableReq, tableStreaming, err := b.constructDAGReq(plans)
tableReq, tableStreaming, err := b.constructDAGReq(plans, kv.TiKV)
if err != nil {
return nil, false, nil, err
}
Expand All @@ -2498,7 +2514,7 @@ func buildTableReq(b *executorBuilder, schemaLen int, plans []plannercore.Physic
}

func buildIndexReq(b *executorBuilder, schemaLen, handleLen int, plans []plannercore.PhysicalPlan) (dagReq *tipb.DAGRequest, streaming bool, err error) {
indexReq, indexStreaming, err := b.constructDAGReq(plans)
indexReq, indexStreaming, err := b.constructDAGReq(plans, kv.TiKV)
if err != nil {
return nil, false, err
}
Expand Down
2 changes: 1 addition & 1 deletion executor/table_readers_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func buildMockDAGRequest(sctx sessionctx.Context) *tipb.DAGRequest {
Columns: []*model.ColumnInfo{},
Table: &model.TableInfo{ID: 12345, PKIsHandle: false},
Desc: false,
}})
}}, kv.TiKV)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
github.com/pingcap/pd/v4 v4.0.0-rc.2.0.20200714122454-1a64f969cb3c
github.com/pingcap/sysutil v0.0.0-20200715082929-4c47bcac246a
github.com/pingcap/tidb-tools v4.0.1+incompatible
github.com/pingcap/tipb v0.0.0-20200615034523-dcfcea0b5965
github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.9.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,8 @@ github.com/pingcap/tidb-tools v4.0.1+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnw
github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20200417094153-7316d94df1ee/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20200604070248-508f03b0b342/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20200615034523-dcfcea0b5965 h1:a0kZ+iaj/sbzJa5mt5310t1XJSpY+wmmIauAkrr7gU4=
github.com/pingcap/tipb v0.0.0-20200615034523-dcfcea0b5965/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3 h1:ESL3eIt1kUt8IMvR1011ejZlAyDcOzw89ARvVHvpD5k=
github.com/pingcap/tipb v0.0.0-20200618092958-4fad48b4c8c3/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
2 changes: 2 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,8 @@ func (e *Explain) explainPlanInRowFormat(p Plan, taskType, driverSide, indent st
buildSide = plan.InnerChildIdx ^ 1
case *PhysicalIndexHashJoin:
buildSide = plan.InnerChildIdx ^ 1
case *PhysicalBroadCastJoin:
buildSide = plan.InnerChildIdx
}

if buildSide != -1 {
Expand Down
Loading