diff --git a/ddl/db_test.go b/ddl/db_test.go index 1ea01cc3b86c3..4563d3108de88 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -3652,3 +3652,174 @@ func getPartitionTableRecordsNum(c *C, ctx sessionctx.Context, tbl table.Partiti } return num } + +func (s *testDBSuite) TestTransactionOnAddDropColumn(c *C) { + s.tk = testkit.NewTestKit(c, s.store) + s.mustExec(c, "use test_db") + s.mustExec(c, "drop table if exists t1") + s.mustExec(c, "create table t1 (a int, b int);") + s.mustExec(c, "create table t2 (a int, b int);") + s.mustExec(c, "insert into t2 values (2,0)") + + transactions := [][]string{ + { + "begin", + "insert into t1 set a=1", + "update t1 set b=1 where a=1", + "commit", + }, + { + "begin", + "insert into t1 select a,b from t2", + "update t1 set b=2 where a=2", + "commit", + }, + } + + originHook := s.dom.DDL().GetHook() + defer s.dom.DDL().(ddl.DDLForTest).SetHook(originHook) + hook := &ddl.TestDDLCallback{} + hook.OnJobRunBeforeExported = func(job *model.Job) { + switch job.SchemaState { + case model.StateWriteOnly, model.StateWriteReorganization, model.StateDeleteOnly, model.StateDeleteReorganization: + default: + return + } + // do transaction. + for _, transaction := range transactions { + for _, sql := range transaction { + s.mustExec(c, sql) + } + } + } + s.dom.DDL().(ddl.DDLForTest).SetHook(hook) + done := make(chan error, 1) + // test transaction on add column. + go backgroundExec(s.store, "alter table t1 add column c int not null after a", done) + err := <-done + c.Assert(err, IsNil) + s.tk.MustQuery("select a,b from t1 order by a").Check(testkit.Rows("1 1", "1 1", "1 1", "2 2", "2 2", "2 2")) + s.mustExec(c, "delete from t1") + + // test transaction on drop column. + go backgroundExec(s.store, "alter table t1 drop column c", done) + err = <-done + c.Assert(err, IsNil) + s.tk.MustQuery("select a,b from t1 order by a").Check(testkit.Rows("1 1", "1 1", "1 1", "2 2", "2 2", "2 2")) +} + +func (s *testDBSuite) TestTransactionWithWriteOnlyColumn(c *C) { + s.tk = testkit.NewTestKit(c, s.store) + s.mustExec(c, "use test_db") + s.mustExec(c, "drop table if exists t1") + s.mustExec(c, "create table t1 (a int key);") + + transactions := [][]string{ + { + "begin", + "insert into t1 set a=1", + "update t1 set a=2 where a=1", + "commit", + }, + } + + originHook := s.dom.DDL().GetHook() + defer s.dom.DDL().(ddl.DDLForTest).SetHook(originHook) + hook := &ddl.TestDDLCallback{} + hook.OnJobRunBeforeExported = func(job *model.Job) { + switch job.SchemaState { + case model.StateWriteOnly: + default: + return + } + // do transaction. + for _, transaction := range transactions { + for _, sql := range transaction { + s.mustExec(c, sql) + } + } + } + s.dom.DDL().(ddl.DDLForTest).SetHook(hook) + done := make(chan error, 1) + // test transaction on add column. + go backgroundExec(s.store, "alter table t1 add column c int not null", done) + err := <-done + c.Assert(err, IsNil) + s.tk.MustQuery("select a from t1").Check(testkit.Rows("2")) + s.mustExec(c, "delete from t1") + + // test transaction on drop column. + go backgroundExec(s.store, "alter table t1 drop column c", done) + err = <-done + c.Assert(err, IsNil) + s.tk.MustQuery("select a from t1").Check(testkit.Rows("2")) +} + +func (s *testDBSuite) TestAddColumn2(c *C) { + s.tk = testkit.NewTestKit(c, s.store) + s.mustExec(c, "use test_db") + s.mustExec(c, "drop table if exists t1") + s.mustExec(c, "create table t1 (a int key, b int);") + defer s.mustExec(c, "drop table if exists t1, t2") + + originHook := s.dom.DDL().GetHook() + defer s.dom.DDL().(ddl.DDLForTest).SetHook(originHook) + hook := &ddl.TestDDLCallback{} + var writeOnlyTable table.Table + hook.OnJobRunBeforeExported = func(job *model.Job) { + if job.SchemaState == model.StateWriteOnly { + writeOnlyTable, _ = s.dom.InfoSchema().TableByID(job.TableID) + } + } + s.dom.DDL().(ddl.DDLForTest).SetHook(hook) + done := make(chan error, 1) + // test transaction on add column. + go backgroundExec(s.store, "alter table t1 add column c int not null", done) + err := <-done + c.Assert(err, IsNil) + + s.mustExec(c, "insert into t1 values (1,1,1)") + s.tk.MustQuery("select a,b,c from t1").Check(testkit.Rows("1 1 1")) + + // mock for outdated tidb update record. + c.Assert(writeOnlyTable, NotNil) + ctx := context.Background() + err = s.tk.Se.NewTxn(ctx) + c.Assert(err, IsNil) + oldRow, err := writeOnlyTable.RowWithCols(s.tk.Se, 1, writeOnlyTable.WritableCols()) + c.Assert(err, IsNil) + c.Assert(len(oldRow), Equals, 3) + err = writeOnlyTable.RemoveRecord(s.tk.Se, 1, oldRow) + c.Assert(err, IsNil) + _, err = writeOnlyTable.AddRecord(s.tk.Se, types.MakeDatums(oldRow[0].GetInt64(), 2, oldRow[2].GetInt64()), + &table.AddRecordOpt{IsUpdate: true}) + c.Assert(err, IsNil) + err = s.tk.Se.StmtCommit() + c.Assert(err, IsNil) + err = s.tk.Se.CommitTxn(ctx) + + s.tk.MustQuery("select a,b,c from t1").Check(testkit.Rows("1 2 1")) + + // Test for _tidb_rowid + var re *testkit.Result + s.mustExec(c, "create table t2 (a int);") + hook.OnJobRunBeforeExported = func(job *model.Job) { + if job.SchemaState != model.StateWriteOnly { + return + } + // allow write _tidb_rowid first + s.mustExec(c, "set @@tidb_opt_write_row_id=1") + s.mustExec(c, "begin") + s.mustExec(c, "insert into t2 (a,_tidb_rowid) values (1,2);") + re = s.tk.MustQuery(" select a,_tidb_rowid from t2;") + s.mustExec(c, "commit") + + } + s.dom.DDL().(ddl.DDLForTest).SetHook(hook) + + go backgroundExec(s.store, "alter table t2 add column b int not null default 3", done) + err = <-done + c.Assert(err, IsNil) + re.Check(testkit.Rows("1 2")) + s.tk.MustQuery("select a,b,_tidb_rowid from t2").Check(testkit.Rows("1 3 2")) +} diff --git a/executor/write.go b/executor/write.go index 3d3ae3ed015f1..af2c21c7d798a 100644 --- a/executor/write.go +++ b/executor/write.go @@ -141,7 +141,7 @@ func updateRecord(ctx sessionctx.Context, h int64, oldData, newData []types.Datu } // the `affectedRows` is increased when adding new record. newHandle, err = t.AddRecord(ctx, newData, - &table.AddRecordOpt{CreateIdxOpt: table.CreateIdxOpt{SkipHandleCheck: sc.DupKeyAsWarning}}) + &table.AddRecordOpt{CreateIdxOpt: table.CreateIdxOpt{SkipHandleCheck: sc.DupKeyAsWarning}, IsUpdate: true}) if err != nil { return false, false, 0, errors.Trace(err) } diff --git a/table/table.go b/table/table.go index b23a77dce57c8..9f86c1fe94dbf 100644 --- a/table/table.go +++ b/table/table.go @@ -89,6 +89,7 @@ type RecordIterFunc func(h int64, rec []types.Datum, cols []*Column) (more bool, // AddRecordOpt contains the options will be used when adding a record. type AddRecordOpt struct { CreateIdxOpt + IsUpdate bool } // Table is used to retrieve and modify rows in table. diff --git a/table/tables/tables.go b/table/tables/tables.go index 7441182e2749a..8644f3cb1a06a 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -417,7 +417,10 @@ func (t *tableCommon) AddRecord(ctx sessionctx.Context, r []types.Datum, opts .. } var hasRecordID bool cols := t.Cols() - if len(r) > len(cols) { + // opt.IsUpdate is a flag for update. + // If handle ID is changed when update, update will remove the old record first, and then call `AddRecord` to add a new record. + // Currently, only insert can set _tidb_rowid, update can not update _tidb_rowid. + if len(r) > len(cols) && !opt.IsUpdate { // The last value is _tidb_rowid. recordID = r[len(r)-1].GetInt64() hasRecordID = true @@ -464,12 +467,21 @@ func (t *tableCommon) AddRecord(ctx sessionctx.Context, r []types.Datum, opts .. for _, col := range t.WritableCols() { var value types.Datum - if col.State != model.StatePublic { + // Update call `AddRecord` will already handle the write only column default value. + // Only insert should add default value for write only column. + if col.State != model.StatePublic && !opt.IsUpdate { // If col is in write only or write reorganization state, we must add it with its default value. value, err = table.GetColOriginDefaultValue(ctx, col.ToInfo()) if err != nil { return 0, errors.Trace(err) } + // add value to `r` for dirty db in transaction. + // Otherwise when update will panic cause by get value of column in write only state from dirty db. + if col.Offset < len(r) { + r[col.Offset] = value + } else { + r = append(r, value) + } } else { value = r[col.Offset] }