Skip to content

Commit

Permalink
executor: fix panic and update error data when table has column in wr…
Browse files Browse the repository at this point in the history
…ite only state
  • Loading branch information
crazycs520 committed Jan 2, 2019
1 parent e06c87d commit e95dca1
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 3 deletions.
171 changes: 171 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2034,3 +2034,174 @@ LOOP:
s.dom.DDL().(ddl.DDLForTest).SetHook(originalHook)
s.mustExec(c, "drop table t1")
}

func (s *testDBSuite) TestTransactionOnAddDropColumn(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.mustExec(c, "use test_db")
s.mustExec(c, "drop table if exists t1")
s.mustExec(c, "create table t1 (a int, b int);")
s.mustExec(c, "create table t2 (a int, b int);")
s.mustExec(c, "insert into t2 values (2,0)")

transactions := [][]string{
{
"begin",
"insert into t1 set a=1",
"update t1 set b=1 where a=1",
"commit",
},
{
"begin",
"insert into t1 select a,b from t2",
"update t1 set b=2 where a=2",
"commit",
},
}

originHook := s.dom.DDL().GetHook()
defer s.dom.DDL().(ddl.DDLForTest).SetHook(originHook)
hook := &ddl.TestDDLCallback{}
hook.OnJobRunBeforeExported = func(job *model.Job) {
switch job.SchemaState {
case model.StateWriteOnly, model.StateWriteReorganization, model.StateDeleteOnly, model.StateDeleteReorganization:
default:
return
}
// do transaction.
for _, transaction := range transactions {
for _, sql := range transaction {
s.mustExec(c, sql)
}
}
}
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
done := make(chan error, 1)
// test transaction on add column.
go backgroundExec(s.store, "alter table t1 add column c int not null after a", done)
err := <-done
c.Assert(err, IsNil)
s.tk.MustQuery("select a,b from t1 order by a").Check(testkit.Rows("1 1", "1 1", "1 1", "2 2", "2 2", "2 2"))
s.mustExec(c, "delete from t1")

// test transaction on drop column.
go backgroundExec(s.store, "alter table t1 drop column c", done)
err = <-done
c.Assert(err, IsNil)
s.tk.MustQuery("select a,b from t1 order by a").Check(testkit.Rows("1 1", "1 1", "1 1", "2 2", "2 2", "2 2"))
}

func (s *testDBSuite) TestTransactionWithWriteOnlyColumn(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.mustExec(c, "use test_db")
s.mustExec(c, "drop table if exists t1")
s.mustExec(c, "create table t1 (a int key);")

transactions := [][]string{
{
"begin",
"insert into t1 set a=1",
"update t1 set a=2 where a=1",
"commit",
},
}

originHook := s.dom.DDL().GetHook()
defer s.dom.DDL().(ddl.DDLForTest).SetHook(originHook)
hook := &ddl.TestDDLCallback{}
hook.OnJobRunBeforeExported = func(job *model.Job) {
switch job.SchemaState {
case model.StateWriteOnly:
default:
return
}
// do transaction.
for _, transaction := range transactions {
for _, sql := range transaction {
s.mustExec(c, sql)
}
}
}
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
done := make(chan error, 1)
// test transaction on add column.
go backgroundExec(s.store, "alter table t1 add column c int not null", done)
err := <-done
c.Assert(err, IsNil)
s.tk.MustQuery("select a from t1").Check(testkit.Rows("2"))
s.mustExec(c, "delete from t1")

// test transaction on drop column.
go backgroundExec(s.store, "alter table t1 drop column c", done)
err = <-done
c.Assert(err, IsNil)
s.tk.MustQuery("select a from t1").Check(testkit.Rows("2"))
}

func (s *testDBSuite) TestAddColumn2(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.mustExec(c, "use test_db")
s.mustExec(c, "drop table if exists t1")
s.mustExec(c, "create table t1 (a int key, b int);")
defer s.mustExec(c, "drop table if exists t1, t2")

originHook := s.dom.DDL().GetHook()
defer s.dom.DDL().(ddl.DDLForTest).SetHook(originHook)
hook := &ddl.TestDDLCallback{}
var writeOnlyTable table.Table
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.SchemaState == model.StateWriteOnly {
writeOnlyTable, _ = s.dom.InfoSchema().TableByID(job.TableID)
}
}
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
done := make(chan error, 1)
// test transaction on add column.
go backgroundExec(s.store, "alter table t1 add column c int not null", done)
err := <-done
c.Assert(err, IsNil)

s.mustExec(c, "insert into t1 values (1,1,1)")
s.tk.MustQuery("select a,b,c from t1").Check(testkit.Rows("1 1 1"))

// mock for outdated tidb update record.
c.Assert(writeOnlyTable, NotNil)
ctx := context.Background()
err = s.tk.Se.NewTxn(ctx)
c.Assert(err, IsNil)
oldRow, err := writeOnlyTable.RowWithCols(s.tk.Se, 1, writeOnlyTable.WritableCols())
c.Assert(err, IsNil)
c.Assert(len(oldRow), Equals, 3)
err = writeOnlyTable.RemoveRecord(s.tk.Se, 1, oldRow)
c.Assert(err, IsNil)
_, err = writeOnlyTable.AddRecord(s.tk.Se, types.MakeDatums(oldRow[0].GetInt64(), 2, oldRow[2].GetInt64()),
&table.AddRecordOpt{IsUpdate: true})
c.Assert(err, IsNil)
err = s.tk.Se.StmtCommit()
c.Assert(err, IsNil)
err = s.tk.Se.CommitTxn(ctx)

s.tk.MustQuery("select a,b,c from t1").Check(testkit.Rows("1 2 1"))

// Test for _tidb_rowid
var re *testkit.Result
s.mustExec(c, "create table t2 (a int);")
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.SchemaState != model.StateWriteOnly {
return
}
// allow write _tidb_rowid first
s.mustExec(c, "set @@tidb_opt_write_row_id=1")
s.mustExec(c, "begin")
s.mustExec(c, "insert into t2 (a,_tidb_rowid) values (1,2);")
re = s.tk.MustQuery(" select a,_tidb_rowid from t2;")
s.mustExec(c, "commit")

}
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)

go backgroundExec(s.store, "alter table t2 add column b int not null default 3", done)
err = <-done
c.Assert(err, IsNil)
re.Check(testkit.Rows("1 2"))
s.tk.MustQuery("select a,b,_tidb_rowid from t2").Check(testkit.Rows("1 3 2"))
}
2 changes: 1 addition & 1 deletion executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func updateRecord(ctx sessionctx.Context, h int64, oldData, newData []types.Datu
}
// the `affectedRows` is increased when adding new record.
newHandle, err = t.AddRecord(ctx, newData,
&table.AddRecordOpt{CreateIdxOpt: table.CreateIdxOpt{SkipHandleCheck: sc.DupKeyAsWarning}})
&table.AddRecordOpt{CreateIdxOpt: table.CreateIdxOpt{SkipHandleCheck: sc.DupKeyAsWarning}, IsUpdate: true})
if err != nil {
return false, false, 0, errors.Trace(err)
}
Expand Down
1 change: 1 addition & 0 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type RecordIterFunc func(h int64, rec []types.Datum, cols []*Column) (more bool,
// AddRecordOpt contains the options will be used when adding a record.
type AddRecordOpt struct {
CreateIdxOpt
IsUpdate bool
}

// Table is used to retrieve and modify rows in table.
Expand Down
11 changes: 9 additions & 2 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func (t *tableCommon) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ..
}
var hasRecordID bool
cols := t.Cols()
if len(r) > len(cols) {
if len(r) > len(cols) && !opt.IsUpdate {
// The last value is _tidb_rowid.
recordID = r[len(r)-1].GetInt64()
hasRecordID = true
Expand Down Expand Up @@ -466,12 +466,19 @@ func (t *tableCommon) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ..

for _, col := range t.WritableCols() {
var value types.Datum
if col.State != model.StatePublic {
if col.State != model.StatePublic && !opt.IsUpdate {
// If col is in write only or write reorganization state, we must add it with its default value.
value, err = table.GetColOriginDefaultValue(ctx, col.ToInfo())
if err != nil {
return 0, errors.Trace(err)
}
// add value to `r` for dirty db in transaction.
// Otherwise when update will panic cause by get value of column in write only state from dirty db.
if col.Offset < len(r) {
r[col.Offset] = value
} else {
r = append(r, value)
}
} else {
value = r[col.Offset]
}
Expand Down

0 comments on commit e95dca1

Please sign in to comment.