Skip to content

Commit

Permalink
plan,executor: support IndexJoin over UnionScan (#7877)
Browse files Browse the repository at this point in the history
  • Loading branch information
eurekaka authored and zz-jason committed Oct 12, 2018
1 parent b3ef641 commit 5efcacb
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 27 deletions.
30 changes: 26 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,18 +670,22 @@ func (b *executorBuilder) buildExplain(v *plannercore.Explain) Executor {
}

func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) Executor {
src := b.build(v.Children()[0])
reader := b.build(v.Children()[0])
if b.err != nil {
b.err = errors.Trace(b.err)
return nil
}
us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src)}
return b.buildUnionScanFromReader(reader, v)
}

func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannercore.PhysicalUnionScan) Executor {
us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), reader)}
// Get the handle column index of the below plannercore.
// We can guarantee that there must be only one col in the map.
for _, cols := range v.Children()[0].Schema().TblID2Handle {
us.belowHandleIndex = cols[0].Index
}
switch x := src.(type) {
switch x := reader.(type) {
case *TableReaderExecutor:
us.desc = x.desc
us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(x.table.Meta().ID)
Expand Down Expand Up @@ -718,7 +722,7 @@ func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) E
b.err = us.buildAndSortAddedRows()
default:
// The mem table will not be written by sql directly, so we can omit the union scan to avoid err reporting.
return src
return reader
}
if b.err != nil {
b.err = errors.Trace(b.err)
Expand Down Expand Up @@ -1814,10 +1818,28 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context,
return builder.buildIndexReaderForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff)
case *plannercore.PhysicalIndexLookUpReader:
return builder.buildIndexLookUpReaderForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff)
case *plannercore.PhysicalUnionScan:
return builder.buildUnionScanForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff)
}
return nil, errors.New("Wrong plan type for dataReaderBuilder")
}

func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context, v *plannercore.PhysicalUnionScan,
values [][]types.Datum, indexRanges []*ranger.Range, keyOff2IdxOff []int) (Executor, error) {
builder.Plan = v.Children()[0]
reader, err := builder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff)
if err != nil {
return nil, errors.Trace(err)
}
e := builder.buildUnionScanFromReader(reader, v)
if e == nil {
return nil, builder.err
}
us := e.(*UnionScanExec)
us.snapshotChunkBuffer = us.newFirstChunk()
return us, nil
}

func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalTableReader, datums [][]types.Datum) (Executor, error) {
e, err := buildNoRangeTableReader(builder.executorBuilder, v)
if err != nil {
Expand Down
54 changes: 54 additions & 0 deletions executor/index_lookup_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,57 @@ func (s *testSuite) TestIndexLookupJoinHang(c *C) {
}
rs.Close()
}

func (s *testSuite) TestIndexJoinUnionScan(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table t1(id int primary key, a int)")
tk.MustExec("create table t2(id int primary key, a int, b int, key idx_a(a))")
tk.MustExec("insert into t2 values (1,1,1),(4,2,4)")
tk.MustExec("begin")
tk.MustExec("insert into t1 values(2,2)")
tk.MustExec("insert into t2 values(2,2,2), (3,3,3)")
// TableScan below UnionScan
tk.MustQuery("explain select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.id").Check(testkit.Rows(
"IndexJoin_11 12500.00 root inner join, inner:UnionScan_10, outer key:test.t1.a, inner key:test.t2.id",
"├─UnionScan_12 10000.00 root ",
"│ └─TableReader_14 10000.00 root data:TableScan_13",
"│ └─TableScan_13 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo",
"└─UnionScan_10 10.00 root ",
" └─TableReader_9 10.00 root data:TableScan_8",
" └─TableScan_8 10.00 cop table:t2, range: decided by [test.t1.a], keep order:false, stats:pseudo",
))
tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.id").Check(testkit.Rows(
"2 2 2 2 2",
))
// IndexLookUp below UnionScan
tk.MustQuery("explain select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.a").Check(testkit.Rows(
"IndexJoin_12 12500.00 root inner join, inner:UnionScan_11, outer key:test.t1.a, inner key:test.t2.a",
"├─UnionScan_13 10000.00 root ",
"│ └─TableReader_15 10000.00 root data:TableScan_14",
"│ └─TableScan_14 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo",
"└─UnionScan_11 10.00 root ",
" └─IndexLookUp_10 10.00 root ",
" ├─IndexScan_8 10.00 cop table:t2, index:a, range: decided by [test.t1.a], keep order:false, stats:pseudo",
" └─TableScan_9 10.00 cop table:t2, keep order:false, stats:pseudo",
))
tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.a").Check(testkit.Rows(
"2 2 2 2 2",
"2 2 4 2 4",
))
// IndexScan below UnionScan
tk.MustQuery("explain select /*+ TIDB_INLJ(t1, t2)*/ t1.a, t2.a from t1 join t2 on t1.a = t2.a").Check(testkit.Rows(
"Projection_7 12500.00 root test.t1.a, test.t2.a",
"└─IndexJoin_11 12500.00 root inner join, inner:UnionScan_10, outer key:test.t1.a, inner key:test.t2.a",
" ├─UnionScan_12 10000.00 root ",
" │ └─TableReader_14 10000.00 root data:TableScan_13",
" │ └─TableScan_13 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo",
" └─UnionScan_10 10.00 root ",
" └─IndexReader_9 10.00 root index:IndexScan_8",
" └─IndexScan_8 10.00 cop table:t2, index:a, range: decided by [test.t1.a], keep order:false, stats:pseudo",
))
tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ t1.a, t2.a from t1 join t2 on t1.a = t2.a").Check(testkit.Rows(
"2 2",
"2 2",
))
tk.MustExec("rollback")
}
49 changes: 30 additions & 19 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,20 +367,23 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou
innerJoinKeys = p.LeftJoinKeys
outerJoinKeys = p.RightJoinKeys
}
x, ok := innerChild.(*DataSource)
if !ok {
ds, isDataSource := innerChild.(*DataSource)
us, isUnionScan := innerChild.(*LogicalUnionScan)
if !isDataSource && !isUnionScan {
return nil
}
if isUnionScan {
ds = us.Children()[0].(*DataSource)
}
var tblPath *accessPath
for _, path := range x.possibleAccessPaths {
for _, path := range ds.possibleAccessPaths {
if path.isTablePath {
tblPath = path
break
}
}
if pkCol := x.getPKIsHandleCol(); pkCol != nil && tblPath != nil {
if pkCol := ds.getPKIsHandleCol(); pkCol != nil && tblPath != nil {
keyOff2IdxOff := make([]int, len(innerJoinKeys))
pkCol := x.getPKIsHandleCol()
pkMatched := false
for i, key := range innerJoinKeys {
if !key.Equal(nil, pkCol) {
Expand All @@ -391,7 +394,7 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou
keyOff2IdxOff[i] = 0
}
if pkMatched {
innerPlan := p.constructInnerTableScan(x, pkCol, outerJoinKeys)
innerPlan := p.constructInnerTableScan(ds, pkCol, outerJoinKeys, us)
// Since the primary key means one value corresponding to exact one row, this will always be a no worse one
// comparing to other index.
return p.constructIndexJoin(prop, innerJoinKeys, outerJoinKeys, outerIdx, innerPlan, nil, keyOff2IdxOff)
Expand All @@ -404,12 +407,12 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou
remainedOfBest []expression.Expression
keyOff2IdxOff []int
)
for _, path := range x.possibleAccessPaths {
for _, path := range ds.possibleAccessPaths {
if path.isTablePath {
continue
}
indexInfo := path.index
ranges, remained, tmpKeyOff2IdxOff := p.buildRangeForIndexJoin(indexInfo, x, innerJoinKeys)
ranges, remained, tmpKeyOff2IdxOff := p.buildRangeForIndexJoin(indexInfo, ds, innerJoinKeys)
// We choose the index by the number of used columns of the range, the much the better.
// Notice that there may be the cases like `t1.a=t2.a and b > 2 and b < 1`. So ranges can be nil though the conditions are valid.
// But obviously when the range is nil, we don't need index join.
Expand All @@ -422,20 +425,15 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou
}
}
if bestIndexInfo != nil {
innerPlan := p.constructInnerIndexScan(x, bestIndexInfo, remainedOfBest, outerJoinKeys)
innerPlan := p.constructInnerIndexScan(ds, bestIndexInfo, remainedOfBest, outerJoinKeys, us)
return p.constructIndexJoin(prop, innerJoinKeys, outerJoinKeys, outerIdx, innerPlan, rangesOfBest, keyOff2IdxOff)
}
return nil
}

// constructInnerTableScan is specially used to construct the inner plan for PhysicalIndexJoin.
func (p *LogicalJoin) constructInnerTableScan(ds *DataSource, pk *expression.Column, outerJoinKeys []*expression.Column) PhysicalPlan {
var ranges []*ranger.Range
if pk != nil {
ranges = ranger.FullIntRange(mysql.HasUnsignedFlag(pk.RetType.Flag))
} else {
ranges = ranger.FullIntRange(false)
}
func (p *LogicalJoin) constructInnerTableScan(ds *DataSource, pk *expression.Column, outerJoinKeys []*expression.Column, us *LogicalUnionScan) PhysicalPlan {
ranges := ranger.FullIntRange(mysql.HasUnsignedFlag(pk.RetType.Flag))
ts := PhysicalTableScan{
Table: ds.tableInfo,
Columns: ds.Columns,
Expand Down Expand Up @@ -464,11 +462,23 @@ func (p *LogicalJoin) constructInnerTableScan(ds *DataSource, pk *expression.Col
}
ts.addPushedDownSelection(copTask, ds.stats)
t := finishCopTask(ds.ctx, copTask)
return t.plan()
reader := t.plan()
return p.constructInnerUnionScan(us, reader)
}

func (p *LogicalJoin) constructInnerUnionScan(us *LogicalUnionScan, reader PhysicalPlan) PhysicalPlan {
if us == nil {
return reader
}
// Use `reader.stats` instead of `us.stats` because it should be more accurate. No need to specify
// childrenReqProps now since we have got reader already.
physicalUnionScan := PhysicalUnionScan{Conditions: us.conditions}.init(us.ctx, reader.statsInfo(), nil)
physicalUnionScan.SetChildren(reader)
return physicalUnionScan
}

// constructInnerIndexScan is specially used to construct the inner plan for PhysicalIndexJoin.
func (p *LogicalJoin) constructInnerIndexScan(ds *DataSource, idx *model.IndexInfo, remainedConds []expression.Expression, outerJoinKeys []*expression.Column) PhysicalPlan {
func (p *LogicalJoin) constructInnerIndexScan(ds *DataSource, idx *model.IndexInfo, remainedConds []expression.Expression, outerJoinKeys []*expression.Column, us *LogicalUnionScan) PhysicalPlan {
is := PhysicalIndexScan{
Table: ds.tableInfo,
TableAsName: ds.TableAsName,
Expand Down Expand Up @@ -507,7 +517,8 @@ func (p *LogicalJoin) constructInnerIndexScan(ds *DataSource, idx *model.IndexIn
path := &accessPath{indexFilters: indexConds, tableFilters: tblConds, countAfterIndex: math.MaxFloat64}
is.addPushedDownSelection(cop, ds, math.MaxFloat64, path)
t := finishCopTask(ds.ctx, cop)
return t.plan()
reader := t.plan()
return p.constructInnerUnionScan(us, reader)
}

// buildRangeForIndexJoin checks whether this index can be used for building index join and return the range if this index is ok.
Expand Down
11 changes: 7 additions & 4 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ func (p *LogicalTableDual) findBestTask(prop *property.PhysicalProperty) (task,

// findBestTask implements LogicalPlan interface.
func (p *baseLogicalPlan) findBestTask(prop *property.PhysicalProperty) (bestTask task, err error) {
// If p is an inner plan in an IndexJoin, the IndexJoin will generate an inner plan by itself,
// and set inner child prop nil, so here we do nothing.
if prop == nil {
return nil, nil
}
// Look up the task with this prop in the task map.
// It's used to reduce double counting.
bestTask = p.getTask(prop)
Expand Down Expand Up @@ -199,10 +204,8 @@ func (ds *DataSource) tryToGetDualTask() (task, error) {
// findBestTask implements the PhysicalPlan interface.
// It will enumerate all the available indices and choose a plan with least cost.
func (ds *DataSource) findBestTask(prop *property.PhysicalProperty) (t task, err error) {
// If ds is an inner plan in an IndexJoin, the IndexJoin will generate an inner plan by itself.
// So here we do nothing.
// TODO: Add a special prop to handle IndexJoin's inner plan.
// Then we can remove forceToTableScan and forceToIndexScan.
// If ds is an inner plan in an IndexJoin, the IndexJoin will generate an inner plan by itself,
// and set inner child prop nil, so here we do nothing.
if prop == nil {
return nil, nil
}
Expand Down
47 changes: 47 additions & 0 deletions planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1270,3 +1270,50 @@ func (s *testPlanSuite) TestRequestTypeSupportedOff(c *C) {
c.Assert(err, IsNil)
c.Assert(core.ToString(p), Equals, expect, Commentf("for %s", sql))
}

func (s *testPlanSuite) TestIndexJoinUnionScan(c *C) {
defer testleak.AfterTest(c)()
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
defer func() {
dom.Close()
store.Close()
}()
se, err := session.CreateSession4Test(store)
c.Assert(err, IsNil)
_, err = se.Execute(context.Background(), "use test")
c.Assert(err, IsNil)
tests := []struct {
sql string
best string
}{
// Test Index Join + UnionScan + TableScan.
{
sql: "select /*+ TIDB_INLJ(t1, t2) */ * from t t1, t t2 where t1.a = t2.a",
best: "IndexJoin{TableReader(Table(t))->UnionScan([])->TableReader(Table(t))->UnionScan([])}(t1.a,t2.a)",
},
// Test Index Join + UnionScan + DoubleRead.
{
sql: "select /*+ TIDB_INLJ(t1, t2) */ * from t t1, t t2 where t1.a = t2.c",
best: "IndexJoin{TableReader(Table(t))->UnionScan([])->IndexLookUp(Index(t.c_d_e)[[NULL,+inf]], Table(t))->UnionScan([])}(t1.a,t2.c)",
},
// Test Index Join + UnionScan + IndexScan.
{
sql: "select /*+ TIDB_INLJ(t1, t2) */ t1.a , t2.c from t t1, t t2 where t1.a = t2.c",
best: "IndexJoin{TableReader(Table(t))->UnionScan([])->IndexReader(Index(t.c_d_e)[[NULL,+inf]])->UnionScan([])}(t1.a,t2.c)->Projection",
},
}
for i, tt := range tests {
comment := Commentf("case:%v sql:%s", i, tt.sql)
stmt, err := s.ParseOneStmt(tt.sql, "", "")
c.Assert(err, IsNil, comment)
err = se.NewTxn()
c.Assert(err, IsNil)
// Make txn not read only.
se.Txn().Set(kv.Key("AAA"), []byte("BBB"))
se.StmtCommit()
p, err := planner.Optimize(se, stmt, s.is)
c.Assert(err, IsNil)
c.Assert(core.ToString(p), Equals, tt.best, comment)
}
}

0 comments on commit 5efcacb

Please sign in to comment.