Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plan,executor: support IndexJoin over UnionScan #7877

Merged
merged 9 commits into from
Oct 12, 2018
30 changes: 26 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,18 +670,22 @@ func (b *executorBuilder) buildExplain(v *plannercore.Explain) Executor {
}

func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) Executor {
src := b.build(v.Children()[0])
reader := b.build(v.Children()[0])
if b.err != nil {
b.err = errors.Trace(b.err)
return nil
}
us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src)}
return b.buildUnionScanFromReader(reader, v)
}

func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannercore.PhysicalUnionScan) Executor {
us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), reader)}
// Get the handle column index of the below plannercore.
// We can guarantee that there must be only one col in the map.
for _, cols := range v.Children()[0].Schema().TblID2Handle {
us.belowHandleIndex = cols[0].Index
}
switch x := src.(type) {
switch x := reader.(type) {
case *TableReaderExecutor:
us.desc = x.desc
us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(x.table.Meta().ID)
Expand Down Expand Up @@ -718,7 +722,7 @@ func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) E
b.err = us.buildAndSortAddedRows()
default:
// The mem table will not be written by sql directly, so we can omit the union scan to avoid err reporting.
return src
return reader
}
if b.err != nil {
b.err = errors.Trace(b.err)
Expand Down Expand Up @@ -1814,10 +1818,28 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context,
return builder.buildIndexReaderForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff)
case *plannercore.PhysicalIndexLookUpReader:
return builder.buildIndexLookUpReaderForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff)
case *plannercore.PhysicalUnionScan:
return builder.buildUnionScanForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff)
}
return nil, errors.New("Wrong plan type for dataReaderBuilder")
}

func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context, v *plannercore.PhysicalUnionScan,
values [][]types.Datum, indexRanges []*ranger.Range, keyOff2IdxOff []int) (Executor, error) {
builder.Plan = v.Children()[0]
reader, err := builder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff)
if err != nil {
return nil, errors.Trace(err)
}
e := builder.buildUnionScanFromReader(reader, v)
if e == nil {
return nil, builder.err
}
us := e.(*UnionScanExec)
us.snapshotChunkBuffer = us.newFirstChunk()
return us, nil
}

func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalTableReader, datums [][]types.Datum) (Executor, error) {
e, err := buildNoRangeTableReader(builder.executorBuilder, v)
if err != nil {
Expand Down
54 changes: 54 additions & 0 deletions executor/index_lookup_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,57 @@ func (s *testSuite) TestIndexLookupJoinHang(c *C) {
}
rs.Close()
}

func (s *testSuite) TestIndexJoinUnionScan(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table t1(id int primary key, a int)")
tk.MustExec("create table t2(id int primary key, a int, b int, key idx_a(a))")
tk.MustExec("insert into t2 values (1,1,1),(4,2,4)")
tk.MustExec("begin")
tk.MustExec("insert into t1 values(2,2)")
tk.MustExec("insert into t2 values(2,2,2), (3,3,3)")
// TableScan below UnionScan
tk.MustQuery("explain select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.id").Check(testkit.Rows(
"IndexJoin_11 12500.00 root inner join, inner:UnionScan_10, outer key:test.t1.a, inner key:test.t2.id",
"├─UnionScan_12 10000.00 root ",
"│ └─TableReader_14 10000.00 root data:TableScan_13",
"│ └─TableScan_13 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo",
"└─UnionScan_10 10.00 root ",
" └─TableReader_9 10.00 root data:TableScan_8",
" └─TableScan_8 10.00 cop table:t2, range: decided by [test.t1.a], keep order:false, stats:pseudo",
))
tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.id").Check(testkit.Rows(
"2 2 2 2 2",
))
// IndexLookUp below UnionScan
tk.MustQuery("explain select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.a").Check(testkit.Rows(
"IndexJoin_12 12500.00 root inner join, inner:UnionScan_11, outer key:test.t1.a, inner key:test.t2.a",
"├─UnionScan_13 10000.00 root ",
"│ └─TableReader_15 10000.00 root data:TableScan_14",
"│ └─TableScan_14 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo",
"└─UnionScan_11 10.00 root ",
" └─IndexLookUp_10 10.00 root ",
" ├─IndexScan_8 10.00 cop table:t2, index:a, range: decided by [test.t1.a], keep order:false, stats:pseudo",
" └─TableScan_9 10.00 cop table:t2, keep order:false, stats:pseudo",
))
tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.a").Check(testkit.Rows(
"2 2 2 2 2",
"2 2 4 2 4",
))
// IndexScan below UnionScan
tk.MustQuery("explain select /*+ TIDB_INLJ(t1, t2)*/ t1.a, t2.a from t1 join t2 on t1.a = t2.a").Check(testkit.Rows(
"Projection_7 12500.00 root test.t1.a, test.t2.a",
"└─IndexJoin_11 12500.00 root inner join, inner:UnionScan_10, outer key:test.t1.a, inner key:test.t2.a",
" ├─UnionScan_12 10000.00 root ",
" │ └─TableReader_14 10000.00 root data:TableScan_13",
" │ └─TableScan_13 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo",
" └─UnionScan_10 10.00 root ",
" └─IndexReader_9 10.00 root index:IndexScan_8",
" └─IndexScan_8 10.00 cop table:t2, index:a, range: decided by [test.t1.a], keep order:false, stats:pseudo",
))
tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ t1.a, t2.a from t1 join t2 on t1.a = t2.a").Check(testkit.Rows(
"2 2",
"2 2",
))
tk.MustExec("rollback")
}
52 changes: 33 additions & 19 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,20 +367,23 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou
innerJoinKeys = p.LeftJoinKeys
outerJoinKeys = p.RightJoinKeys
}
x, ok := innerChild.(*DataSource)
if !ok {
ds, isDataSource := innerChild.(*DataSource)
us, isUnionScan := innerChild.(*LogicalUnionScan)
if !isDataSource && !isUnionScan {
return nil
}
if isUnionScan {
ds = us.Children()[0].(*DataSource)
}
var tblPath *accessPath
for _, path := range x.possibleAccessPaths {
for _, path := range ds.possibleAccessPaths {
if path.isTablePath {
tblPath = path
break
}
}
if pkCol := x.getPKIsHandleCol(); pkCol != nil && tblPath != nil {
if pkCol := ds.getPKIsHandleCol(); pkCol != nil && tblPath != nil {
keyOff2IdxOff := make([]int, len(innerJoinKeys))
pkCol := x.getPKIsHandleCol()
pkMatched := false
for i, key := range innerJoinKeys {
if !key.Equal(nil, pkCol) {
Expand All @@ -391,7 +394,7 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou
keyOff2IdxOff[i] = 0
}
if pkMatched {
innerPlan := p.constructInnerTableScan(x, pkCol, outerJoinKeys)
innerPlan := p.constructInnerTableScan(ds, pkCol, outerJoinKeys, us)
// Since the primary key means one value corresponding to exact one row, this will always be a no worse one
// comparing to other index.
return p.constructIndexJoin(prop, innerJoinKeys, outerJoinKeys, outerIdx, innerPlan, nil, keyOff2IdxOff)
Expand All @@ -404,12 +407,12 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou
remainedOfBest []expression.Expression
keyOff2IdxOff []int
)
for _, path := range x.possibleAccessPaths {
for _, path := range ds.possibleAccessPaths {
if path.isTablePath {
continue
}
indexInfo := path.index
ranges, remained, tmpKeyOff2IdxOff := p.buildRangeForIndexJoin(indexInfo, x, innerJoinKeys)
ranges, remained, tmpKeyOff2IdxOff := p.buildRangeForIndexJoin(indexInfo, ds, innerJoinKeys)
// We choose the index by the number of used columns of the range, the much the better.
// Notice that there may be the cases like `t1.a=t2.a and b > 2 and b < 1`. So ranges can be nil though the conditions are valid.
// But obviously when the range is nil, we don't need index join.
Expand All @@ -422,20 +425,15 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou
}
}
if bestIndexInfo != nil {
innerPlan := p.constructInnerIndexScan(x, bestIndexInfo, remainedOfBest, outerJoinKeys)
innerPlan := p.constructInnerIndexScan(ds, bestIndexInfo, remainedOfBest, outerJoinKeys, us)
return p.constructIndexJoin(prop, innerJoinKeys, outerJoinKeys, outerIdx, innerPlan, rangesOfBest, keyOff2IdxOff)
}
return nil
}

// constructInnerTableScan is specially used to construct the inner plan for PhysicalIndexJoin.
func (p *LogicalJoin) constructInnerTableScan(ds *DataSource, pk *expression.Column, outerJoinKeys []*expression.Column) PhysicalPlan {
var ranges []*ranger.Range
if pk != nil {
ranges = ranger.FullIntRange(mysql.HasUnsignedFlag(pk.RetType.Flag))
} else {
ranges = ranger.FullIntRange(false)
}
func (p *LogicalJoin) constructInnerTableScan(ds *DataSource, pk *expression.Column, outerJoinKeys []*expression.Column, us *LogicalUnionScan) PhysicalPlan {
ranges := ranger.FullIntRange(mysql.HasUnsignedFlag(pk.RetType.Flag))
ts := PhysicalTableScan{
Table: ds.tableInfo,
Columns: ds.Columns,
Expand Down Expand Up @@ -464,11 +462,19 @@ func (p *LogicalJoin) constructInnerTableScan(ds *DataSource, pk *expression.Col
}
ts.addPushedDownSelection(copTask, ds.stats)
t := finishCopTask(ds.ctx, copTask)
return t.plan()
scan := t.plan()
if us != nil {
// Use `ts.stats` instead of `us.stats` because it should be more accurate. No need to specify
// childrenReqProps now since we have got ts already.
physicalUnionScan := PhysicalUnionScan{Conditions: us.conditions}.init(us.ctx, ts.stats, nil)
physicalUnionScan.SetChildren(scan)
scan = physicalUnionScan
}
return scan
}

// constructInnerIndexScan is specially used to construct the inner plan for PhysicalIndexJoin.
func (p *LogicalJoin) constructInnerIndexScan(ds *DataSource, idx *model.IndexInfo, remainedConds []expression.Expression, outerJoinKeys []*expression.Column) PhysicalPlan {
func (p *LogicalJoin) constructInnerIndexScan(ds *DataSource, idx *model.IndexInfo, remainedConds []expression.Expression, outerJoinKeys []*expression.Column, us *LogicalUnionScan) PhysicalPlan {
is := PhysicalIndexScan{
Table: ds.tableInfo,
TableAsName: ds.TableAsName,
Expand Down Expand Up @@ -507,7 +513,15 @@ func (p *LogicalJoin) constructInnerIndexScan(ds *DataSource, idx *model.IndexIn
path := &accessPath{indexFilters: indexConds, tableFilters: tblConds, countAfterIndex: math.MaxFloat64}
is.addPushedDownSelection(cop, ds, math.MaxFloat64, path)
t := finishCopTask(ds.ctx, cop)
return t.plan()
scan := t.plan()
if us != nil {
// Use `is.stats` instead of `us.stats` because it should be more accurate. No need to specify
// childrenReqProps now since we have got is already.
physicalUnionScan := PhysicalUnionScan{Conditions: us.conditions}.init(us.ctx, is.stats, nil)
physicalUnionScan.SetChildren(scan)
scan = physicalUnionScan
}
return scan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

465-473 and 516-524 maybe can extract a method, except this LGTM

}

// buildRangeForIndexJoin checks whether this index can be used for building index join and return the range if this index is ok.
Expand Down
11 changes: 7 additions & 4 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ func (p *LogicalTableDual) findBestTask(prop *property.PhysicalProperty) (task,

// findBestTask implements LogicalPlan interface.
func (p *baseLogicalPlan) findBestTask(prop *property.PhysicalProperty) (bestTask task, err error) {
// If p is an inner plan in an IndexJoin, the IndexJoin will generate an inner plan by itself,
// and set inner child prop nil, so here we do nothing.
if prop == nil {
return nil, nil
}
// Look up the task with this prop in the task map.
// It's used to reduce double counting.
bestTask = p.getTask(prop)
Expand Down Expand Up @@ -199,10 +204,8 @@ func (ds *DataSource) tryToGetDualTask() (task, error) {
// findBestTask implements the PhysicalPlan interface.
// It will enumerate all the available indices and choose a plan with least cost.
func (ds *DataSource) findBestTask(prop *property.PhysicalProperty) (t task, err error) {
// If ds is an inner plan in an IndexJoin, the IndexJoin will generate an inner plan by itself.
// So here we do nothing.
// TODO: Add a special prop to handle IndexJoin's inner plan.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep this TODO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have thought about adding this special prop in this patch, and then realized that current solution using nil for quick return is clear enough, so I removed this TODO.

// Then we can remove forceToTableScan and forceToIndexScan.
// If ds is an inner plan in an IndexJoin, the IndexJoin will generate an inner plan by itself,
// and set inner child prop nil, so here we do nothing.
if prop == nil {
return nil, nil
}
Expand Down
47 changes: 47 additions & 0 deletions planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1269,3 +1269,50 @@ func (s *testPlanSuite) TestRequestTypeSupportedOff(c *C) {
c.Assert(err, IsNil)
c.Assert(core.ToString(p), Equals, expect, Commentf("for %s", sql))
}

func (s *testPlanSuite) TestIndexJoinUnionScan(c *C) {
defer testleak.AfterTest(c)()
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
defer func() {
dom.Close()
store.Close()
}()
se, err := session.CreateSession4Test(store)
c.Assert(err, IsNil)
_, err = se.Execute(context.Background(), "use test")
c.Assert(err, IsNil)
tests := []struct {
sql string
best string
}{
// Test Index Join + UnionScan + TableScan.
{
sql: "select /*+ TIDB_INLJ(t1, t2) */ * from t t1, t t2 where t1.a = t2.a",
best: "IndexJoin{TableReader(Table(t))->UnionScan([])->TableReader(Table(t))->UnionScan([])}(t1.a,t2.a)",
},
// Test Index Join + UnionScan + DoubleRead.
{
sql: "select /*+ TIDB_INLJ(t1, t2) */ * from t t1, t t2 where t1.a = t2.c",
best: "IndexJoin{TableReader(Table(t))->UnionScan([])->IndexLookUp(Index(t.c_d_e)[[NULL,+inf]], Table(t))->UnionScan([])}(t1.a,t2.c)",
},
// Test Index Join + UnionScan + IndexScan.
{
sql: "select /*+ TIDB_INLJ(t1, t2) */ t1.a , t2.c from t t1, t t2 where t1.a = t2.c",
best: "IndexJoin{TableReader(Table(t))->UnionScan([])->IndexReader(Index(t.c_d_e)[[NULL,+inf]])->UnionScan([])}(t1.a,t2.c)->Projection",
},
}
for i, tt := range tests {
comment := Commentf("case:%v sql:%s", i, tt.sql)
stmt, err := s.ParseOneStmt(tt.sql, "", "")
c.Assert(err, IsNil, comment)
err = se.NewTxn()
c.Assert(err, IsNil)
// Make txn not read only.
se.Txn().Set(kv.Key("AAA"), []byte("BBB"))
se.StmtCommit()
p, err := core.Optimize(se, stmt, s.is)
c.Assert(err, IsNil)
c.Assert(core.ToString(p), Equals, tt.best, comment)
}
}