Skip to content

Commit

Permalink
executor: fix build table reader for index join on partition table (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Aug 20, 2020
1 parent 172a2d3 commit 47526d3
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 24 deletions.
42 changes: 38 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2936,12 +2936,20 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
if err != nil {
return nil, err
}
tbInfo := e.table.Meta()
if v.IsCommonHandle {
kvRanges, err := buildKvRangesForIndexJoin(e.ctx, getPhysicalTableID(e.table), -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
if err != nil {
return nil, err
}
return builder.buildTableReaderFromKvRanges(ctx, e, kvRanges)
if tbInfo.GetPartitionInfo() == nil {
return builder.buildTableReaderFromKvRanges(ctx, e, kvRanges)
}
e.kvRangeBuilder = kvRangeBuilderFromFunc(func(pid int64) ([]kv.KeyRange, error) {
return buildKvRangesForIndexJoin(e.ctx, pid, -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc)
})
nextPartition := nextPartitionForTableReader{e}
return buildPartitionTable(builder.executorBuilder, tbInfo, v.PartitionTable.PruningConds, v.PartitionTable.PartitionNames, e, nextPartition)
}
handles := make([]kv.Handle, 0, len(lookUpContents))
for _, content := range lookUpContents {
Expand All @@ -2957,10 +2965,36 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
handles = append(handles, handle)
}
}
return builder.buildTableReaderFromHandles(ctx, e, handles)

if tbInfo.GetPartitionInfo() == nil {
return builder.buildTableReaderFromHandles(ctx, e, handles)
}
if tryOldPartitionImplementation(builder.ctx) {
return builder.buildTableReaderFromHandles(ctx, e, handles)
}

e.kvRangeBuilder = kvRangeBuilderFromHandles(handles)
nextPartition := nextPartitionForTableReader{e}
return buildPartitionTable(builder.executorBuilder, tbInfo, v.PartitionTable.PruningConds, v.PartitionTable.PartitionNames, e, nextPartition)
}

type kvRangeBuilderFromFunc func(pid int64) ([]kv.KeyRange, error)

func (h kvRangeBuilderFromFunc) buildKeyRange(pid int64) ([]kv.KeyRange, error) {
return h(pid)
}

type kvRangeBuilderFromHandles []kv.Handle

func (h kvRangeBuilderFromHandles) buildKeyRange(pid int64) ([]kv.KeyRange, error) {
handles := []kv.Handle(h)
sort.Slice(handles, func(i, j int) bool {
return handles[i].Compare(handles[j]) < 0
})
return distsql.TableHandlesToKVRanges(pid, handles), nil
}

func (builder *dataReaderBuilder) buildTableReaderBase(ctx context.Context, e *TableReaderExecutor, reqBuilderWithRange distsql.RequestBuilder) (Executor, error) {
func (builder *dataReaderBuilder) buildTableReaderBase(ctx context.Context, e *TableReaderExecutor, reqBuilderWithRange distsql.RequestBuilder) (*TableReaderExecutor, error) {
startTS, err := builder.getSnapshotTS()
if err != nil {
return nil, err
Expand All @@ -2987,7 +3021,7 @@ func (builder *dataReaderBuilder) buildTableReaderBase(ctx context.Context, e *T
return e, nil
}

func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Context, e *TableReaderExecutor, handles []kv.Handle) (Executor, error) {
func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Context, e *TableReaderExecutor, handles []kv.Handle) (*TableReaderExecutor, error) {
sort.Slice(handles, func(i, j int) bool {
return handles[i].Compare(handles[j]) < 0
})
Expand Down
38 changes: 20 additions & 18 deletions executor/index_lookup_merge_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,26 @@ func (s *testSuite9) TestIssue18631(c *C) {
func (s *testSuiteWithData) TestIndexJoinOnSinglePartitionTable(c *C) {
// For issue 19145
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("set @try_old_partition_implementation = 1")
tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1 (c_int int, c_str varchar(40), primary key (c_int) ) partition by range (c_int) ( partition p0 values less than (10), partition p1 values less than maxvalue )")
tk.MustExec("create table t2 (c_int int, c_str varchar(40), primary key (c_int) ) partition by range (c_int) ( partition p0 values less than (10), partition p1 values less than maxvalue )")
tk.MustExec("insert into t1 values (1, 'Alice')")
tk.MustExec("insert into t2 values (1, 'Bob')")
sql := "select /*+ INL_MERGE_JOIN(t1,t2) */ * from t1 join t2 partition(p0) on t1.c_int = t2.c_int and t1.c_str < t2.c_str"
tk.MustQuery(sql).Check(testkit.Rows("1 Alice 1 Bob"))
rows := s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + sql).Rows())
c.Assert(strings.Index(rows[0], "IndexMergeJoin"), Equals, 0)
for _, val := range []string{"1", "null"} {
tk.MustExec("set @try_old_partition_implementation = " + val)
tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1 (c_int int, c_str varchar(40), primary key (c_int) ) partition by range (c_int) ( partition p0 values less than (10), partition p1 values less than maxvalue )")
tk.MustExec("create table t2 (c_int int, c_str varchar(40), primary key (c_int) ) partition by range (c_int) ( partition p0 values less than (10), partition p1 values less than maxvalue )")
tk.MustExec("insert into t1 values (1, 'Alice')")
tk.MustExec("insert into t2 values (1, 'Bob')")
sql := "select /*+ INL_MERGE_JOIN(t1,t2) */ * from t1 join t2 partition(p0) on t1.c_int = t2.c_int and t1.c_str < t2.c_str"
tk.MustQuery(sql).Check(testkit.Rows("1 Alice 1 Bob"))
rows := s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + sql).Rows())
c.Assert(strings.Index(rows[0], "IndexMergeJoin"), Equals, 0)

sql = "select /*+ INL_HASH_JOIN(t1,t2) */ * from t1 join t2 partition(p0) on t1.c_int = t2.c_int and t1.c_str < t2.c_str"
tk.MustQuery(sql).Check(testkit.Rows("1 Alice 1 Bob"))
rows = s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + sql).Rows())
c.Assert(strings.Index(rows[0], "IndexHashJoin"), Equals, 0)
sql = "select /*+ INL_HASH_JOIN(t1,t2) */ * from t1 join t2 partition(p0) on t1.c_int = t2.c_int and t1.c_str < t2.c_str"
tk.MustQuery(sql).Check(testkit.Rows("1 Alice 1 Bob"))
rows = s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + sql).Rows())
c.Assert(strings.Index(rows[0], "IndexHashJoin"), Equals, 0)

sql = "select /*+ INL_JOIN(t1,t2) */ * from t1 join t2 partition(p0) on t1.c_int = t2.c_int and t1.c_str < t2.c_str"
tk.MustQuery(sql).Check(testkit.Rows("1 Alice 1 Bob"))
rows = s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + sql).Rows())
c.Assert(strings.Index(rows[0], "IndexJoin"), Equals, 0)
sql = "select /*+ INL_JOIN(t1,t2) */ * from t1 join t2 partition(p0) on t1.c_int = t2.c_int and t1.c_str < t2.c_str"
tk.MustQuery(sql).Check(testkit.Rows("1 Alice 1 Bob"))
rows = s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + sql).Rows())
c.Assert(strings.Index(rows[0], "IndexJoin"), Equals, 0)
}
}
23 changes: 21 additions & 2 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,25 @@ func (sr selectResultHook) SelectResult(ctx context.Context, sctx sessionctx.Con
return sr.selectResultFunc(ctx, sctx, kvReq, fieldTypes, fb, copPlanIDs)
}

type kvRangeBuilder interface {
buildKeyRange(pid int64) ([]kv.KeyRange, error)
}

// TableReaderExecutor sends DAG request and reads table data from kv layer.
type TableReaderExecutor struct {
baseExecutor

table table.Table
table table.Table

// The source of key ranges varies from case to case.
// It may be calculated from PyhsicalPlan by executorBuilder, or calculated from argument by dataBuilder;
// It may be calculated from ranger.Ranger, or calculated from handles.
// The table ID may also change because of the partition table, and causes the key range to change.
// So instead of keeping a `range` struct field, it's better to define a interface.
kvRangeBuilder
// TODO: remove this field, use the kvRangeBuilder interface.
ranges []*ranger.Range

// kvRanges are only use for union scan.
kvRanges []kv.KeyRange
dagPB *tipb.DAGRequest
Expand Down Expand Up @@ -199,7 +212,13 @@ func (e *TableReaderExecutor) Close() error {
func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Range) (distsql.SelectResult, error) {
var builder distsql.RequestBuilder
var reqBuilder *distsql.RequestBuilder
if e.table.Meta() != nil && e.table.Meta().IsCommonHandle {
if e.kvRangeBuilder != nil {
kvRange, err := e.kvRangeBuilder.buildKeyRange(getPhysicalTableID(e.table))
if err != nil {
return nil, err
}
reqBuilder = builder.SetKeyRanges(kvRange)
} else if e.table.Meta() != nil && e.table.Meta().IsCommonHandle {
reqBuilder = builder.SetCommonHandleRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), ranges)
} else {
reqBuilder = builder.SetTableRanges(getPhysicalTableID(e.table), ranges, e.feedback)
Expand Down

0 comments on commit 47526d3

Please sign in to comment.