Skip to content

Commit

Permalink
executor: fix panic and update error data when table has column in wr…
Browse files Browse the repository at this point in the history
…ite only state (pingcap#8792)
  • Loading branch information
crazycs520 committed Jan 2, 2019
1 parent cceab82 commit 3433531
Show file tree
Hide file tree
Showing 8 changed files with 219 additions and 38 deletions.
170 changes: 170 additions & 0 deletions ddl/ddl_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2033,3 +2033,173 @@ func (s *testDBSuite) TestAddIndexFailed(c *C) {
tk.MustExec("admin check index t idx_b")
tk.MustExec("admin check table t")
}

func (s *testDBSuite) TestTransactionOnAddDropColumn(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.mustExec(c, "use test_db")
s.mustExec(c, "drop table if exists t1")
s.mustExec(c, "create table t1 (a int, b int);")
s.mustExec(c, "create table t2 (a int, b int);")
s.mustExec(c, "insert into t2 values (2,0)")

transactions := [][]string{
{
"begin",
"insert into t1 set a=1",
"update t1 set b=1 where a=1",
"commit",
},
{
"begin",
"insert into t1 select a,b from t2",
"update t1 set b=2 where a=2",
"commit",
},
}

originHook := s.dom.DDL().GetHook()
defer s.dom.DDL().SetHook(originHook)
hook := &ddl.TestDDLCallback{}
hook.OnJobRunBeforeExported = func(job *model.Job) {
switch job.SchemaState {
case model.StateWriteOnly, model.StateWriteReorganization, model.StateDeleteOnly, model.StateDeleteReorganization:
default:
return
}
// do transaction.
for _, transaction := range transactions {
for _, sql := range transaction {
s.mustExec(c, sql)
}
}
}
s.dom.DDL().SetHook(hook)
done := make(chan error, 1)
// test transaction on add column.
go backgroundExec(s.store, "alter table t1 add column c int not null after a", done)
err := <-done
c.Assert(err, IsNil)
s.tk.MustQuery("select a,b from t1 order by a").Check(testkit.Rows("1 1", "1 1", "1 1", "2 2", "2 2", "2 2"))
s.mustExec(c, "delete from t1")

// test transaction on drop column.
go backgroundExec(s.store, "alter table t1 drop column c", done)
err = <-done
c.Assert(err, IsNil)
s.tk.MustQuery("select a,b from t1 order by a").Check(testkit.Rows("1 1", "1 1", "1 1", "2 2", "2 2", "2 2"))
}

func (s *testDBSuite) TestTransactionWithWriteOnlyColumn(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.mustExec(c, "use test_db")
s.mustExec(c, "drop table if exists t1")
s.mustExec(c, "create table t1 (a int key);")

transactions := [][]string{
{
"begin",
"insert into t1 set a=1",
"update t1 set a=2 where a=1",
"commit",
},
}

originHook := s.dom.DDL().GetHook()
defer s.dom.DDL().SetHook(originHook)
hook := &ddl.TestDDLCallback{}
hook.OnJobRunBeforeExported = func(job *model.Job) {
switch job.SchemaState {
case model.StateWriteOnly:
default:
return
}
// do transaction.
for _, transaction := range transactions {
for _, sql := range transaction {
s.mustExec(c, sql)
}
}
}
s.dom.DDL().SetHook(hook)
done := make(chan error, 1)
// test transaction on add column.
go backgroundExec(s.store, "alter table t1 add column c int not null", done)
err := <-done
c.Assert(err, IsNil)
s.tk.MustQuery("select a from t1").Check(testkit.Rows("2"))
s.mustExec(c, "delete from t1")

// test transaction on drop column.
go backgroundExec(s.store, "alter table t1 drop column c", done)
err = <-done
c.Assert(err, IsNil)
s.tk.MustQuery("select a from t1").Check(testkit.Rows("2"))
}

func (s *testDBSuite) TestAddColumn2(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.mustExec(c, "use test_db")
s.mustExec(c, "drop table if exists t1")
s.mustExec(c, "create table t1 (a int key, b int);")
defer s.mustExec(c, "drop table if exists t1, t2")

originHook := s.dom.DDL().GetHook()
defer s.dom.DDL().SetHook(originHook)
hook := &ddl.TestDDLCallback{}
var writeOnlyTable table.Table
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.SchemaState == model.StateWriteOnly {
writeOnlyTable, _ = s.dom.InfoSchema().TableByID(job.TableID)
}
}
s.dom.DDL().SetHook(hook)
done := make(chan error, 1)
// test transaction on add column.
go backgroundExec(s.store, "alter table t1 add column c int not null", done)
err := <-done
c.Assert(err, IsNil)

s.mustExec(c, "insert into t1 values (1,1,1)")
s.tk.MustQuery("select a,b,c from t1").Check(testkit.Rows("1 1 1"))

// mock for outdated tidb update record.
c.Assert(writeOnlyTable, NotNil)
ctx := context.Background()
err = s.tk.Se.NewTxn()
c.Assert(err, IsNil)
oldRow, err := writeOnlyTable.RowWithCols(s.tk.Se, 1, writeOnlyTable.WritableCols())
c.Assert(err, IsNil)
c.Assert(len(oldRow), Equals, 3)
err = writeOnlyTable.RemoveRecord(s.tk.Se, 1, oldRow)
c.Assert(err, IsNil)
_, err = writeOnlyTable.AddRecord(s.tk.Se, types.MakeDatums(oldRow[0].GetInt64(), 2, oldRow[2].GetInt64()), false, true)
c.Assert(err, IsNil)
err = s.tk.Se.StmtCommit()
c.Assert(err, IsNil)
err = s.tk.Se.CommitTxn(ctx)

s.tk.MustQuery("select a,b,c from t1").Check(testkit.Rows("1 2 1"))

// Test for _tidb_rowid
var re *testkit.Result
s.mustExec(c, "create table t2 (a int);")
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.SchemaState != model.StateWriteOnly {
return
}
// allow write _tidb_rowid first
s.mustExec(c, "set @@tidb_opt_write_row_id=1")
s.mustExec(c, "begin")
s.mustExec(c, "insert into t2 (a,_tidb_rowid) values (1,2);")
re = s.tk.MustQuery(" select a,_tidb_rowid from t2;")
s.mustExec(c, "commit")

}
s.dom.DDL().SetHook(hook)

go backgroundExec(s.store, "alter table t2 add column b int not null default 3", done)
err = <-done
c.Assert(err, IsNil)
re.Check(testkit.Rows("1 2"))
s.tk.MustQuery("select a,b,_tidb_rowid from t2").Check(testkit.Rows("1 3 2"))
}
31 changes: 4 additions & 27 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,10 @@ func updateRecord(ctx sessionctx.Context, h int64, oldData, newData []types.Datu
if err != nil {
return false, handleChanged, newHandle, 0, errors.Trace(err)
}
newHandle, err = t.AddRecord(ctx, newData, skipHandleCheck)
newHandle, err = t.AddRecord(ctx, newData, skipHandleCheck, []bool{true}...)
if onDup {
sc.AddAffectedRows(1)
}
} else {
// Update record to new value and update index.
err = t.UpdateRecord(ctx, h, oldData, newData, modified)
Expand All @@ -163,14 +166,6 @@ func updateRecord(ctx sessionctx.Context, h int64, oldData, newData []types.Datu
return false, handleChanged, newHandle, 0, errors.Trace(err)
}

tid := t.Meta().ID
ctx.StmtAddDirtyTableOP(DirtyTableDeleteRow, tid, h, nil)
if handleChanged {
ctx.StmtAddDirtyTableOP(DirtyTableAddRow, tid, newHandle, newData)
} else {
ctx.StmtAddDirtyTableOP(DirtyTableAddRow, tid, h, newData)
}

if onDup {
sc.AddAffectedRows(2)
} else {
Expand Down Expand Up @@ -395,21 +390,11 @@ func (e *DeleteExec) removeRowsInTblRowMap(tblRowMap tableRowMapType) error {
return nil
}

const (
// DirtyTableAddRow is the constant for dirty table operation type.
DirtyTableAddRow = iota
// DirtyTableDeleteRow is the constant for dirty table operation type.
DirtyTableDeleteRow
// DirtyTableTruncate is the constant for dirty table operation type.
DirtyTableTruncate
)

func (e *DeleteExec) removeRow(ctx sessionctx.Context, t table.Table, h int64, data []types.Datum) error {
err := t.RemoveRecord(ctx, h, data)
if err != nil {
return errors.Trace(err)
}
ctx.StmtAddDirtyTableOP(DirtyTableDeleteRow, t.Meta().ID, h, nil)
ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
colSize := make(map[int64]int64)
for id, col := range t.Cols() {
Expand Down Expand Up @@ -851,9 +836,6 @@ func (e *InsertExec) insertOneRow(row []types.Datum) (int64, error) {
if err != nil {
return 0, errors.Trace(err)
}
if !e.ctx.GetSessionVars().ImportingData {
e.ctx.StmtAddDirtyTableOP(DirtyTableAddRow, e.Table.Meta().ID, h, row)
}
e.rowCount++
return h, nil
}
Expand Down Expand Up @@ -907,9 +889,6 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) (types.Datu
h, err := e.Table.AddRecord(e.ctx, row, false)
txn.DelOption(kv.PresumeKeyNotExists)
if err == nil {
if !sessVars.ImportingData {
e.ctx.StmtAddDirtyTableOP(DirtyTableAddRow, e.Table.Meta().ID, h, row)
}
e.rowCount++
continue
}
Expand Down Expand Up @@ -1863,7 +1842,6 @@ func (e *ReplaceExec) exec(ctx context.Context, rows [][]types.Datum) (types.Dat
row := rows[idx]
h, err1 := e.Table.AddRecord(e.ctx, row, false)
if err1 == nil {
e.ctx.StmtAddDirtyTableOP(DirtyTableAddRow, e.Table.Meta().ID, h, row)
idx++
continue
}
Expand Down Expand Up @@ -1891,7 +1869,6 @@ func (e *ReplaceExec) exec(ctx context.Context, rows [][]types.Datum) (types.Dat
if err1 != nil {
return nil, errors.Trace(err1)
}
e.ctx.StmtAddDirtyTableOP(DirtyTableDeleteRow, e.Table.Meta().ID, h, nil)
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
}

Expand Down
2 changes: 1 addition & 1 deletion infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -1252,7 +1252,7 @@ func (it *infoschemaTable) RecordKey(h int64) kv.Key {
return nil
}

func (it *infoschemaTable) AddRecord(ctx sessionctx.Context, r []types.Datum, skipHandleCheck bool) (recordID int64, err error) {
func (it *infoschemaTable) AddRecord(ctx sessionctx.Context, r []types.Datum, skipHandleCheck bool, isUpdate ...bool) (recordID int64, err error) {
return 0, table.ErrUnsupportedOp
}

Expand Down
7 changes: 4 additions & 3 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package session

import (
"github.com/juju/errors"
"github.com/pingcap/tidb/table"
"runtime/debug"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -219,11 +220,11 @@ func mergeToMutation(m1, m2 *binlog.TableMutation) {

func mergeToDirtyDB(dirtyDB *executor.DirtyDB, op dirtyTableOperation) {
switch op.kind {
case executor.DirtyTableAddRow:
case table.DirtyTableAddRow:
dirtyDB.AddRow(op.tid, op.handle, op.row)
case executor.DirtyTableDeleteRow:
case table.DirtyTableDeleteRow:
dirtyDB.DeleteRow(op.tid, op.handle)
case executor.DirtyTableTruncate:
case table.DirtyTableTruncate:
dirtyDB.TruncateTable(op.tid)
}
}
Expand Down
11 changes: 10 additions & 1 deletion table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ const (
MemoryTable
)

const (
// DirtyTableAddRow is the constant for dirty table operation type.
DirtyTableAddRow = iota
// DirtyTableDeleteRow is the constant for dirty table operation type.
DirtyTableDeleteRow
// DirtyTableTruncate is the constant for dirty table operation type.
DirtyTableTruncate
)

var (
errColumnCantNull = terror.ClassTable.New(codeColumnCantNull, "column can not be null")
errUnknownColumn = terror.ClassTable.New(codeUnknownColumn, "unknown column")
Expand Down Expand Up @@ -113,7 +122,7 @@ type Table interface {

// AddRecord inserts a row which should contain only public columns
// skipHandleCheck indicates that recordID in r has been checked as not duplicate already.
AddRecord(ctx sessionctx.Context, r []types.Datum, skipHandleCheck bool) (recordID int64, err error)
AddRecord(ctx sessionctx.Context, r []types.Datum, skipHandleCheck bool, isUpdate ...bool) (recordID int64, err error)

// UpdateRecord updates a row which should contain only writable columns.
UpdateRecord(ctx sessionctx.Context, h int64, currData, newData []types.Datum, touched []bool) error
Expand Down
2 changes: 1 addition & 1 deletion table/tables/memory_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (t *MemoryTable) UpdateRecord(ctx sessionctx.Context, h int64, oldData, new
}

// AddRecord implements table.Table AddRecord interface.
func (t *MemoryTable) AddRecord(ctx sessionctx.Context, r []types.Datum, skipHandleCheck bool) (recordID int64, err error) {
func (t *MemoryTable) AddRecord(ctx sessionctx.Context, r []types.Datum, skipHandleCheck bool, isUpdate ...bool) (recordID int64, err error) {
if t.pkHandleCol != nil {
recordID, err = r[t.pkHandleCol.Offset].ToInt64(ctx.GetSessionVars().StmtCtx)
if err != nil {
Expand Down
Loading

0 comments on commit 3433531

Please sign in to comment.