Skip to content

Commit

Permalink
executor: fix panic and update error data when table has column in wr…
Browse files Browse the repository at this point in the history
…ite only state (#8792) (#8904)
  • Loading branch information
crazycs520 authored and winkyao committed Jan 2, 2019
1 parent 94af2bc commit ef9b85c
Show file tree
Hide file tree
Showing 9 changed files with 294 additions and 46 deletions.
170 changes: 170 additions & 0 deletions ddl/ddl_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2119,3 +2119,173 @@ func (s *testDBSuite) TestAddIndexFailed(c *C) {
tk.MustExec("admin check index t idx_b")
tk.MustExec("admin check table t")
}

func (s *testDBSuite) TestTransactionOnAddDropColumn(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.mustExec(c, "use test_db")
s.mustExec(c, "drop table if exists t1")
s.mustExec(c, "create table t1 (a int, b int);")
s.mustExec(c, "create table t2 (a int, b int);")
s.mustExec(c, "insert into t2 values (2,0)")

transactions := [][]string{
{
"begin",
"insert into t1 set a=1",
"update t1 set b=1 where a=1",
"commit",
},
{
"begin",
"insert into t1 select a,b from t2",
"update t1 set b=2 where a=2",
"commit",
},
}

originHook := s.dom.DDL().GetHook()
defer s.dom.DDL().SetHook(originHook)
hook := &ddl.TestDDLCallback{}
hook.OnJobRunBeforeExported = func(job *model.Job) {
switch job.SchemaState {
case model.StateWriteOnly, model.StateWriteReorganization, model.StateDeleteOnly, model.StateDeleteReorganization:
default:
return
}
// do transaction.
for _, transaction := range transactions {
for _, sql := range transaction {
s.mustExec(c, sql)
}
}
}
s.dom.DDL().SetHook(hook)
done := make(chan error, 1)
// test transaction on add column.
go backgroundExec(s.store, "alter table t1 add column c int not null after a", done)
err := <-done
c.Assert(err, IsNil)
s.tk.MustQuery("select a,b from t1 order by a").Check(testkit.Rows("1 1", "1 1", "1 1", "2 2", "2 2", "2 2"))
s.mustExec(c, "delete from t1")

// test transaction on drop column.
go backgroundExec(s.store, "alter table t1 drop column c", done)
err = <-done
c.Assert(err, IsNil)
s.tk.MustQuery("select a,b from t1 order by a").Check(testkit.Rows("1 1", "1 1", "1 1", "2 2", "2 2", "2 2"))
}

func (s *testDBSuite) TestTransactionWithWriteOnlyColumn(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.mustExec(c, "use test_db")
s.mustExec(c, "drop table if exists t1")
s.mustExec(c, "create table t1 (a int key);")

transactions := [][]string{
{
"begin",
"insert into t1 set a=1",
"update t1 set a=2 where a=1",
"commit",
},
}

originHook := s.dom.DDL().GetHook()
defer s.dom.DDL().SetHook(originHook)
hook := &ddl.TestDDLCallback{}
hook.OnJobRunBeforeExported = func(job *model.Job) {
switch job.SchemaState {
case model.StateWriteOnly:
default:
return
}
// do transaction.
for _, transaction := range transactions {
for _, sql := range transaction {
s.mustExec(c, sql)
}
}
}
s.dom.DDL().SetHook(hook)
done := make(chan error, 1)
// test transaction on add column.
go backgroundExec(s.store, "alter table t1 add column c int not null", done)
err := <-done
c.Assert(err, IsNil)
s.tk.MustQuery("select a from t1").Check(testkit.Rows("2"))
s.mustExec(c, "delete from t1")

// test transaction on drop column.
go backgroundExec(s.store, "alter table t1 drop column c", done)
err = <-done
c.Assert(err, IsNil)
s.tk.MustQuery("select a from t1").Check(testkit.Rows("2"))
}

func (s *testDBSuite) TestAddColumn2(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.mustExec(c, "use test_db")
s.mustExec(c, "drop table if exists t1")
s.mustExec(c, "create table t1 (a int key, b int);")
defer s.mustExec(c, "drop table if exists t1, t2")

originHook := s.dom.DDL().GetHook()
defer s.dom.DDL().SetHook(originHook)
hook := &ddl.TestDDLCallback{}
var writeOnlyTable table.Table
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.SchemaState == model.StateWriteOnly {
writeOnlyTable, _ = s.dom.InfoSchema().TableByID(job.TableID)
}
}
s.dom.DDL().SetHook(hook)
done := make(chan error, 1)
// test transaction on add column.
go backgroundExec(s.store, "alter table t1 add column c int not null", done)
err := <-done
c.Assert(err, IsNil)

s.mustExec(c, "insert into t1 values (1,1,1)")
s.tk.MustQuery("select a,b,c from t1").Check(testkit.Rows("1 1 1"))

// mock for outdated tidb update record.
c.Assert(writeOnlyTable, NotNil)
ctx := context.Background()
err = s.tk.Se.NewTxn()
c.Assert(err, IsNil)
oldRow, err := writeOnlyTable.RowWithCols(s.tk.Se, 1, writeOnlyTable.WritableCols())
c.Assert(err, IsNil)
c.Assert(len(oldRow), Equals, 3)
err = writeOnlyTable.RemoveRecord(s.tk.Se, 1, oldRow)
c.Assert(err, IsNil)
_, err = writeOnlyTable.AddRecord(s.tk.Se, types.MakeDatums(oldRow[0].GetInt64(), 2, oldRow[2].GetInt64()), false, true)
c.Assert(err, IsNil)
err = s.tk.Se.StmtCommit()
c.Assert(err, IsNil)
err = s.tk.Se.CommitTxn(ctx)

s.tk.MustQuery("select a,b,c from t1").Check(testkit.Rows("1 2 1"))

// Test for _tidb_rowid
var re *testkit.Result
s.mustExec(c, "create table t2 (a int);")
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.SchemaState != model.StateWriteOnly {
return
}
// allow write _tidb_rowid first
s.mustExec(c, "set @@tidb_opt_write_row_id=1")
s.mustExec(c, "begin")
s.mustExec(c, "insert into t2 (a,_tidb_rowid) values (1,2);")
re = s.tk.MustQuery(" select a,_tidb_rowid from t2;")
s.mustExec(c, "commit")

}
s.dom.DDL().SetHook(hook)

go backgroundExec(s.store, "alter table t2 add column b int not null default 3", done)
err = <-done
c.Assert(err, IsNil)
re.Check(testkit.Rows("1 2"))
s.tk.MustQuery("select a,b,_tidb_rowid from t2").Check(testkit.Rows("1 3 2"))
}
54 changes: 19 additions & 35 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,28 +154,30 @@ func updateRecord(ctx sessionctx.Context, h int64, oldData, newData []types.Datu
if err != nil {
return false, handleChanged, newHandle, 0, errors.Trace(err)
}
newHandle, err = t.AddRecord(ctx, newData, skipHandleCheck)
// the `affectedRows` is increased when adding new record.
newHandle, err = t.AddRecord(ctx, newData, skipHandleCheck, true)
if err != nil {
return false, handleChanged, newHandle, 0, errors.Trace(err)
}
if onDup {
sc.AddAffectedRows(1)
}
} else {
// Update record to new value and update index.
err = t.UpdateRecord(ctx, h, oldData, newData, modified)
}
if err != nil {
return false, handleChanged, newHandle, 0, errors.Trace(err)
}

tid := t.Meta().ID
ctx.StmtAddDirtyTableOP(DirtyTableDeleteRow, tid, h, nil)
if handleChanged {
ctx.StmtAddDirtyTableOP(DirtyTableAddRow, tid, newHandle, newData)
} else {
ctx.StmtAddDirtyTableOP(DirtyTableAddRow, tid, h, newData)
if err != nil {
return false, false, h, 0, errors.Trace(err)
}
if onDup {
sc.AddAffectedRows(2)
} else {
// if handleChanged == true, the `affectedRows` is calculated when add new record.
if !handleChanged {
sc.AddAffectedRows(1)
}
}
}

if onDup {
sc.AddAffectedRows(2)
} else {
sc.AddAffectedRows(1)
}
colSize := make(map[int64]int64)
for id, col := range t.Cols() {
val := int64(len(newData[id].GetBytes()) - len(oldData[id].GetBytes()))
Expand Down Expand Up @@ -395,21 +397,11 @@ func (e *DeleteExec) removeRowsInTblRowMap(tblRowMap tableRowMapType) error {
return nil
}

const (
// DirtyTableAddRow is the constant for dirty table operation type.
DirtyTableAddRow = iota
// DirtyTableDeleteRow is the constant for dirty table operation type.
DirtyTableDeleteRow
// DirtyTableTruncate is the constant for dirty table operation type.
DirtyTableTruncate
)

func (e *DeleteExec) removeRow(ctx sessionctx.Context, t table.Table, h int64, data []types.Datum) error {
err := t.RemoveRecord(ctx, h, data)
if err != nil {
return errors.Trace(err)
}
ctx.StmtAddDirtyTableOP(DirtyTableDeleteRow, t.Meta().ID, h, nil)
ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
colSize := make(map[int64]int64)
for id, col := range t.Cols() {
Expand Down Expand Up @@ -851,9 +843,6 @@ func (e *InsertExec) insertOneRow(row []types.Datum) (int64, error) {
if err != nil {
return 0, errors.Trace(err)
}
if !e.ctx.GetSessionVars().ImportingData {
e.ctx.StmtAddDirtyTableOP(DirtyTableAddRow, e.Table.Meta().ID, h, row)
}
e.rowCount++
return h, nil
}
Expand Down Expand Up @@ -907,9 +896,6 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) (types.Datu
h, err := e.Table.AddRecord(e.ctx, row, false)
txn.DelOption(kv.PresumeKeyNotExists)
if err == nil {
if !sessVars.ImportingData {
e.ctx.StmtAddDirtyTableOP(DirtyTableAddRow, e.Table.Meta().ID, h, row)
}
e.rowCount++
continue
}
Expand Down Expand Up @@ -1863,7 +1849,6 @@ func (e *ReplaceExec) exec(ctx context.Context, rows [][]types.Datum) (types.Dat
row := rows[idx]
h, err1 := e.Table.AddRecord(e.ctx, row, false)
if err1 == nil {
e.ctx.StmtAddDirtyTableOP(DirtyTableAddRow, e.Table.Meta().ID, h, row)
idx++
continue
}
Expand Down Expand Up @@ -1891,7 +1876,6 @@ func (e *ReplaceExec) exec(ctx context.Context, rows [][]types.Datum) (types.Dat
if err1 != nil {
return nil, errors.Trace(err1)
}
e.ctx.StmtAddDirtyTableOP(DirtyTableDeleteRow, e.Table.Meta().ID, h, nil)
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
}

Expand Down
60 changes: 60 additions & 0 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1690,3 +1690,63 @@ func (s *testSuite) TestDefEnumInsert(c *C) {
tk.MustExec("insert into test (id) values (1)")
tk.MustQuery("select prescription_type from test").Check(testkit.Rows("a"))
}

func (s *testSuite) TestUpdateAffectRowCnt(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table a(id int auto_increment, a int default null, primary key(id))")
tk.MustExec("insert into a values (1, 1001), (2, 1001), (10001, 1), (3, 1)")
tk.MustExec("update a set id = id*10 where a = 1001")
ctx := tk.Se.(sessionctx.Context)
c.Assert(ctx.GetSessionVars().StmtCtx.AffectedRows(), Equals, uint64(2))

tk.MustExec("drop table a")
tk.MustExec("create table a ( a bigint, b bigint)")
tk.MustExec("insert into a values (1, 1001), (2, 1001), (10001, 1), (3, 1)")
tk.MustExec("update a set a = a*10 where b = 1001")
ctx = tk.Se.(sessionctx.Context)
c.Assert(ctx.GetSessionVars().StmtCtx.AffectedRows(), Equals, uint64(2))
}

func (s *testSuite) TestInsertOnDuplicateKey(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")

tk.MustExec(`drop table if exists t1, t2;`)
tk.MustExec(`create table t1(a1 bigint primary key, b1 bigint);`)
tk.MustExec(`create table t2(a2 bigint primary key, b2 bigint);`)
tk.MustExec(`insert into t1 values(1, 100);`)
c.Assert(tk.Se.AffectedRows(), Equals, uint64(1))
tk.MustExec(`insert into t2 values(1, 200);`)
c.Assert(tk.Se.AffectedRows(), Equals, uint64(1))

tk.MustExec(`insert into t1 values (1, 200) on duplicate key update b1 = 1;`)
c.Assert(tk.Se.AffectedRows(), Equals, uint64(2))
tk.MustQuery(`select * from t1;`).Check(testkit.Rows("1 1"))

tk.MustExec(`insert into t1 values (1, 200) on duplicate key update b1 = 200;`)
c.Assert(tk.Se.AffectedRows(), Equals, uint64(2))
tk.MustQuery(`select * from t1;`).Check(testkit.Rows("1 200"))

tk.MustExec(`insert into t1 values (1, 200) on duplicate key update a1 = 1;`)
c.Assert(tk.Se.AffectedRows(), Equals, uint64(0))
tk.MustQuery(`select * from t1;`).Check(testkit.Rows("1 200"))

tk.MustExec(`insert into t1 values (1, 200) on duplicate key update b1 = 300;`)
c.Assert(tk.Se.AffectedRows(), Equals, uint64(2))
tk.MustQuery(`select * from t1;`).Check(testkit.Rows("1 300"))

tk.MustExec(`insert into t1 values(1, 1) on duplicate key update b1 = 400;`)
c.Assert(tk.Se.AffectedRows(), Equals, uint64(2))
tk.MustQuery(`select * from t1;`).Check(testkit.Rows("1 400"))

tk.MustExec(`insert into t1 select 1, 500 from t2 on duplicate key update b1 = 400;`)
c.Assert(tk.Se.AffectedRows(), Equals, uint64(0))
tk.MustQuery(`select * from t1;`).Check(testkit.Rows("1 400"))

tk.MustExec(`drop table if exists t1, t2;`)
tk.MustExec(`create table t1(a bigint primary key, b bigint);`)
tk.MustExec(`create table t2(a bigint primary key, b bigint);`)
_, err := tk.Exec(`insert into t1 select * from t2 on duplicate key update c = t2.b;`)
c.Assert(err.Error(), Equals, `column c not found`)
}
2 changes: 1 addition & 1 deletion infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -1252,7 +1252,7 @@ func (it *infoschemaTable) RecordKey(h int64) kv.Key {
return nil
}

func (it *infoschemaTable) AddRecord(ctx sessionctx.Context, r []types.Datum, skipHandleCheck bool) (recordID int64, err error) {
func (it *infoschemaTable) AddRecord(ctx sessionctx.Context, r []types.Datum, skipHandleCheck bool, isUpdate ...bool) (recordID int64, err error) {
return 0, table.ErrUnsupportedOp
}

Expand Down
7 changes: 4 additions & 3 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package session

import (
"github.com/juju/errors"
"github.com/pingcap/tidb/table"
"runtime/debug"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -219,11 +220,11 @@ func mergeToMutation(m1, m2 *binlog.TableMutation) {

func mergeToDirtyDB(dirtyDB *executor.DirtyDB, op dirtyTableOperation) {
switch op.kind {
case executor.DirtyTableAddRow:
case table.DirtyTableAddRow:
dirtyDB.AddRow(op.tid, op.handle, op.row)
case executor.DirtyTableDeleteRow:
case table.DirtyTableDeleteRow:
dirtyDB.DeleteRow(op.tid, op.handle)
case executor.DirtyTableTruncate:
case table.DirtyTableTruncate:
dirtyDB.TruncateTable(op.tid)
}
}
Expand Down
Loading

0 comments on commit ef9b85c

Please sign in to comment.