Skip to content

Commit

Permalink
Merge pull request #1 from lysu/hd_2pc_robi
Browse files Browse the repository at this point in the history
cherry pick and add log
  • Loading branch information
XuHuaiyu committed Nov 23, 2018
2 parents d7b4c09 + f962dde commit c86200d
Show file tree
Hide file tree
Showing 15 changed files with 349 additions and 92 deletions.
72 changes: 56 additions & 16 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,11 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor {
hasRefCols: v.NeedFillDefaultValue,
SelectExec: selectExec,
}
err := ivs.initInsertColumns()
if err != nil {
b.err = err
return nil
}

if v.IsReplace {
return b.buildReplace(ivs)
Expand All @@ -572,25 +577,23 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
GenColumns: v.GenCols.Columns,
GenExprs: v.GenCols.Exprs,
}
tableCols := tbl.Cols()
columns, err := insertVal.getColumns(tableCols)
err := insertVal.initInsertColumns()
if err != nil {
b.err = errors.Trace(err)
b.err = err
return nil
}
loadDataExec := &LoadDataExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ExplainID()),
IsLocal: v.IsLocal,
loadDataInfo: &LoadDataInfo{
row: make([]types.Datum, len(columns)),
row: make([]types.Datum, len(insertVal.insertColumns)),
InsertValues: insertVal,
Path: v.Path,
Table: tbl,
FieldsInfo: v.FieldsInfo,
LinesInfo: v.LinesInfo,
IgnoreLines: v.IgnoreLines,
Ctx: b.ctx,
columns: columns,
},
}

Expand Down Expand Up @@ -670,24 +673,43 @@ func (b *executorBuilder) buildExplain(v *plannercore.Explain) Executor {
}

func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) Executor {
src := b.build(v.Children()[0])
reader := b.build(v.Children()[0])
if b.err != nil {
b.err = errors.Trace(b.err)
return nil
}
us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src)}
us, err := b.buildUnionScanFromReader(reader, v)
if err != nil {
b.err = err
return nil
}
return us
}

// buildUnionScanFromReader builds union scan executor from child executor.
// Note that this function may be called by inner workers of index lookup join concurrently.
// Be careful to avoid data race.
func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannercore.PhysicalUnionScan) (Executor, error) {
var err error
us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), reader)}
// Get the handle column index of the below plannercore.
// We can guarantee that there must be only one col in the map.
for _, cols := range v.Children()[0].Schema().TblID2Handle {
us.belowHandleIndex = cols[0].Index
}
switch x := src.(type) {
switch x := reader.(type) {
case *TableReaderExecutor:
us.desc = x.desc
// Union scan can only be in a write transaction, so DirtyDB should has non-nil value now, thus
// GetDirtyDB() is safe here. If this table has been modified in the transaction, non-nil DirtyTable
// can be found in DirtyDB now, so GetDirtyTable is safe; if this table has not been modified in the
// transaction, empty DirtyTable would be inserted into DirtyDB, it does not matter when multiple
// goroutines write empty DirtyTable to DirtyDB for this table concurrently. Thus we don't use lock
// to synchronize here.
us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(x.table.Meta().ID)
us.conditions = v.Conditions
us.columns = x.columns
b.err = us.buildAndSortAddedRows()
err = us.buildAndSortAddedRows()
case *IndexReaderExecutor:
us.desc = x.desc
for _, ic := range x.index.Columns {
Expand All @@ -701,7 +723,7 @@ func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) E
us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(x.table.Meta().ID)
us.conditions = v.Conditions
us.columns = x.columns
b.err = us.buildAndSortAddedRows()
err = us.buildAndSortAddedRows()
case *IndexLookUpExecutor:
us.desc = x.desc
for _, ic := range x.index.Columns {
Expand All @@ -715,16 +737,16 @@ func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) E
us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(x.table.Meta().ID)
us.conditions = v.Conditions
us.columns = x.columns
b.err = us.buildAndSortAddedRows()
err = us.buildAndSortAddedRows()
default:
// The mem table will not be written by sql directly, so we can omit the union scan to avoid err reporting.
return src
return reader, nil
}
if b.err != nil {
b.err = errors.Trace(b.err)
return nil
if err != nil {
err = errors.Trace(err)
return nil, err
}
return us
return us, nil
}

// buildMergeJoin builds MergeJoinExec executor.
Expand Down Expand Up @@ -1806,10 +1828,28 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context,
return builder.buildIndexReaderForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff)
case *plannercore.PhysicalIndexLookUpReader:
return builder.buildIndexLookUpReaderForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff)
case *plannercore.PhysicalUnionScan:
return builder.buildUnionScanForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff)
}
return nil, errors.New("Wrong plan type for dataReaderBuilder")
}

func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context, v *plannercore.PhysicalUnionScan,
values [][]types.Datum, indexRanges []*ranger.Range, keyOff2IdxOff []int) (Executor, error) {
childBuilder := &dataReaderBuilder{v.Children()[0], builder.executorBuilder}
reader, err := childBuilder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff)
if err != nil {
return nil, err
}
e, err := builder.buildUnionScanFromReader(reader, v)
if err != nil {
return nil, err
}
us := e.(*UnionScanExec)
us.snapshotChunkBuffer = us.newFirstChunk()
return us, nil
}

func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalTableReader, datums [][]types.Datum) (Executor, error) {
e, err := buildNoRangeTableReader(builder.executorBuilder, v)
if err != nil {
Expand Down
17 changes: 17 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3206,3 +3206,20 @@ func (s *testSuite) TestCurrentTimestampValueSelection(c *C) {
c.Assert(strings.Split(b, ".")[1], Equals, "00")
c.Assert(len(strings.Split(d, ".")[1]), Equals, 3)
}

func (s *testSuite) TestRowID(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`use test`)
tk.MustExec(`drop table if exists t`)
tk.MustExec(`create table t(a varchar(10), b varchar(10), c varchar(1), index idx(a, b, c));`)
tk.MustExec(`insert into t values('a', 'b', 'c');`)
tk.MustExec(`insert into t values('a', 'b', 'c');`)
tk.MustQuery(`select b, _tidb_rowid from t use index(idx) where a = 'a';`).Check(testkit.Rows(
`b 1`,
`b 2`,
))
tk.MustExec(`begin;`)
tk.MustExec(`select * from t for update`)
tk.MustQuery(`select distinct b from t use index(idx) where a = 'a';`).Check(testkit.Rows(`b`))
tk.MustExec(`commit;`)
}
80 changes: 80 additions & 0 deletions executor/index_lookup_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,83 @@ func (s *testSuite) TestIndexLookupJoinHang(c *C) {
}
rs.Close()
}

func (s *testSuite) TestIndexJoinUnionScan(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table t1(id int primary key, a int)")
tk.MustExec("create table t2(id int primary key, a int, b int, key idx_a(a))")
tk.MustExec("insert into t2 values (1,1,1),(4,2,4)")
tk.MustExec("begin")
tk.MustExec("insert into t1 values(2,2)")
tk.MustExec("insert into t2 values(2,2,2), (3,3,3)")
// TableScan below UnionScan
tk.MustQuery("explain select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.id").Check(testkit.Rows(
"IndexJoin_11 12500.00 root inner join, inner:UnionScan_10, outer key:test.t1.a, inner key:test.t2.id",
"├─UnionScan_12 10000.00 root ",
"│ └─TableReader_14 10000.00 root data:TableScan_13",
"│ └─TableScan_13 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo",
"└─UnionScan_10 10.00 root ",
" └─TableReader_9 10.00 root data:TableScan_8",
" └─TableScan_8 10.00 cop table:t2, range: decided by [test.t1.a], keep order:false, stats:pseudo",
))
tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.id").Check(testkit.Rows(
"2 2 2 2 2",
))
// IndexLookUp below UnionScan
tk.MustQuery("explain select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.a").Check(testkit.Rows(
"IndexJoin_12 12500.00 root inner join, inner:UnionScan_11, outer key:test.t1.a, inner key:test.t2.a",
"├─UnionScan_13 10000.00 root ",
"│ └─TableReader_15 10000.00 root data:TableScan_14",
"│ └─TableScan_14 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo",
"└─UnionScan_11 10.00 root ",
" └─IndexLookUp_10 10.00 root ",
" ├─IndexScan_8 10.00 cop table:t2, index:a, range: decided by [test.t1.a], keep order:false, stats:pseudo",
" └─TableScan_9 10.00 cop table:t2, keep order:false, stats:pseudo",
))
tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.a").Check(testkit.Rows(
"2 2 2 2 2",
"2 2 4 2 4",
))
// IndexScan below UnionScan
tk.MustQuery("explain select /*+ TIDB_INLJ(t1, t2)*/ t1.a, t2.a from t1 join t2 on t1.a = t2.a").Check(testkit.Rows(
"Projection_7 12500.00 root test.t1.a, test.t2.a",
"└─IndexJoin_11 12500.00 root inner join, inner:UnionScan_10, outer key:test.t1.a, inner key:test.t2.a",
" ├─UnionScan_12 10000.00 root ",
" │ └─TableReader_14 10000.00 root data:TableScan_13",
" │ └─TableScan_13 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo",
" └─UnionScan_10 10.00 root ",
" └─IndexReader_9 10.00 root index:IndexScan_8",
" └─IndexScan_8 10.00 cop table:t2, index:a, range: decided by [test.t1.a], keep order:false, stats:pseudo",
))
tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ t1.a, t2.a from t1 join t2 on t1.a = t2.a").Check(testkit.Rows(
"2 2",
"2 2",
))
tk.MustExec("rollback")
}

func (s *testSuite) TestBatchIndexJoinUnionScan(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table t1(id int primary key, a int)")
tk.MustExec("create table t2(id int primary key, a int, key idx_a(a))")
tk.MustExec("set @@session.tidb_max_chunk_size=1")
tk.MustExec("set @@session.tidb_index_join_batch_size=1")
tk.MustExec("set @@session.tidb_index_lookup_join_concurrency=4")
tk.MustExec("begin")
tk.MustExec("insert into t1 values(1,1),(2,1),(3,1),(4,1)")
tk.MustExec("insert into t2 values(1,1)")
tk.MustQuery("explain select /*+ TIDB_INLJ(t1, t2)*/ count(*) from t1 join t2 on t1.a = t2.a").Check(testkit.Rows(
"StreamAgg_13 1.00 root funcs:count(1)",
"└─IndexJoin_24 12500.00 root inner join, inner:UnionScan_23, outer key:test.t1.a, inner key:test.t2.a",
" ├─UnionScan_25 10000.00 root ",
" │ └─TableReader_27 10000.00 root data:TableScan_26",
" │ └─TableScan_26 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo",
" └─UnionScan_23 10.00 root ",
" └─IndexReader_22 10.00 root index:IndexScan_21",
" └─IndexScan_21 10.00 cop table:t2, index:a, range: decided by [test.t1.a], keep order:false, stats:pseudo",
))
tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ count(*) from t1 join t2 on t1.a = t2.id").Check(testkit.Rows(
"4",
))
tk.MustExec("rollback")
}
10 changes: 3 additions & 7 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,10 @@ func (e *InsertExec) batchUpdateDupRows(newRows [][]types.Datum) error {
// Next implements Exec Next interface.
func (e *InsertExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
cols, err := e.getColumns(e.Table.Cols())
if err != nil {
return errors.Trace(err)
}

if len(e.children) > 0 && e.children[0] != nil {
return errors.Trace(e.insertRowsFromSelect(ctx, cols, e.exec))
return e.insertRowsFromSelect(ctx, e.exec)
}
return errors.Trace(e.insertRows(cols, e.exec))
return e.insertRows(e.exec)
}

// Close implements the Executor Close interface.
Expand All @@ -154,6 +149,7 @@ func (e *InsertExec) Open(ctx context.Context) error {
if e.SelectExec != nil {
return e.SelectExec.Open(ctx)
}
e.initEvalBuffer()
return nil
}

Expand Down
Loading

0 comments on commit c86200d

Please sign in to comment.