Skip to content

Commit

Permalink
ddl, parser: reimplement check for multi-schema-change (pingcap#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
Defined2014 committed Mar 15, 2022
1 parent bd47071 commit 67eeacc
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 90 deletions.
6 changes: 5 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/table"
goutil "github.com/pingcap/tidb/util"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -618,6 +619,9 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
if mci := ctx.GetSessionVars().StmtCtx.MultiSchemaInfo; mci != nil {
// In multiple schema change, we don't run the job.
// Instead, merge all the jobs into one pending job.
if err := fillMultiSchemaInfo(mci, job); err != nil {
return err
}
mci.MergeSubJob(job)
return nil
}
Expand Down Expand Up @@ -718,7 +722,7 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
}
}
logutil.BgLogger().Info("[ddl] DDL job is cancelled", zap.Int64("jobID", jobID))
return errCancelledDDLJob
return dbterror.ErrCancelledDDLJob
}
panic("When the state is JobStateRollbackDone or JobStateCancelled, historyJob.Error should never be nil")
}
Expand Down
39 changes: 1 addition & 38 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3410,10 +3410,6 @@ func checkAndCreateNewColumn(ctx sessionctx.Context, ti ast.Ident, schema *model
if err != nil {
return nil, errors.Trace(err)
}

if info := ctx.GetSessionVars().StmtCtx.MultiSchemaInfo; info != nil {
info.AddColumns = append(info.AddColumns, col.ColumnInfo)
}
return col, nil
}

Expand Down Expand Up @@ -4061,10 +4057,6 @@ func checkIsDroppableColumn(ctx sessionctx.Context, t table.Table, spec *ast.Alt
}
return false, err
}

if info := ctx.GetSessionVars().StmtCtx.MultiSchemaInfo; info != nil {
info.DropColumns = append(info.DropColumns, col.ColumnInfo)
}
if err = isDroppableColumn(ctx.GetSessionVars().EnableChangeMultiSchema, tblInfo, colName); err != nil {
return false, errors.Trace(err)
}
Expand Down Expand Up @@ -4467,7 +4459,7 @@ func (d *ddl) getModifiableColumnJob(ctx context.Context, sctx sessionctx.Contex
Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset},
},
CtxVars: []interface{}{needChangeColData},
Args: []interface{}{&newCol, originalColName, spec.Position, modifyColumnTp, newAutoRandBits},
Args: []interface{}{&newCol.ColumnInfo, originalColName, spec.Position, modifyColumnTp, newAutoRandBits},
}
return job, nil
}
Expand Down Expand Up @@ -4711,10 +4703,6 @@ func (d *ddl) RenameColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.Al
},
Args: []interface{}{&newCol, oldColName, spec.Position, 0},
}
if info := ctx.GetSessionVars().StmtCtx.MultiSchemaInfo; info != nil {
info.AddColumns = append(info.AddColumns, newCol)
info.DropColumns = append(info.DropColumns, oldCol.ToInfo())
}
err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
Expand Down Expand Up @@ -4800,7 +4788,6 @@ func (d *ddl) AlterColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.Alt
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{col},
}

err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
Expand Down Expand Up @@ -5171,14 +5158,6 @@ func (d *ddl) RenameIndex(ctx sessionctx.Context, ident ast.Ident, spec *ast.Alt
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{spec.FromKey, spec.ToKey},
}
if info := ctx.GetSessionVars().StmtCtx.MultiSchemaInfo; info != nil {
idx := tb.Meta().FindIndexByName(spec.FromKey.L)
newIdx := idx.Clone()
newIdx.Name = spec.ToKey

info.DropIndexes = append(info.DropIndexes, idx)
info.AddIndexes = append(info.AddIndexes, newIdx)
}
err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
Expand Down Expand Up @@ -5731,15 +5710,6 @@ func (d *ddl) CreateIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast.Inde
Args: []interface{}{unique, indexName, indexPartSpecifications, indexOption, hiddenCols, global},
Priority: ctx.GetSessionVars().DDLReorgPriority,
}

if info := ctx.GetSessionVars().StmtCtx.MultiSchemaInfo; info != nil {
info.AddIndexes = append(info.AddIndexes, &model.IndexInfo{
Name: indexName,
Columns: indexColumns,
State: model.StateNone,
})
}

err = d.doDDLJob(ctx, job)
// key exists, but if_not_exists flags is true, so we ignore this error.
if dbterror.ErrDupKeyName.Equal(err) && ifNotExists {
Expand Down Expand Up @@ -5940,10 +5910,6 @@ func (d *ddl) DropIndex(ctx sessionctx.Context, ti ast.Ident, indexName model.CI
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{indexName},
}

if info := ctx.GetSessionVars().StmtCtx.MultiSchemaInfo; info != nil {
info.DropIndexes = append(info.DropIndexes, indexInfo)
}
err = d.doDDLJob(ctx, job)
// index not exists, but if_exists flags is true, so we ignore this error.
if dbterror.ErrCantDropFieldOrKey.Equal(err) && ifExists {
Expand Down Expand Up @@ -5986,9 +5952,6 @@ func (d *ddl) DropIndexes(ctx sessionctx.Context, ti ast.Ident, specs []*ast.Alt

indexNames = append(indexNames, indexName)
ifExists = append(ifExists, spec.IfExists)
if info := ctx.GetSessionVars().StmtCtx.MultiSchemaInfo; info != nil {
info.DropIndexes = append(info.DropIndexes, indexInfo)
}
}

job := &model.Job{
Expand Down
121 changes: 93 additions & 28 deletions ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sync"

"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/dbterror"
Expand Down Expand Up @@ -146,42 +147,106 @@ func handleRevertibleException(job *model.Job, res model.JobState, idx int) {
}
}

func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *model.Job) (err error) {
switch job.Type {
case model.ActionAddColumn:
col := job.Args[0].(*table.Column)
pos := job.Args[1].(*ast.ColumnPosition)
info.AddColumns = append(info.AddColumns, col.Name)
if pos.Tp == ast.ColumnPositionAfter {
info.RelativeColumns = append(info.RelativeColumns, pos.RelativeColumn.Name)
}
case model.ActionAddColumns:
cols := job.Args[0].([]*table.Column)
pos := job.Args[0].([]*ast.ColumnPosition)
for i := range cols {
info.AddColumns = append(info.AddColumns, cols[i].Name)
if pos[i].Tp == ast.ColumnPositionAfter {
info.RelativeColumns = append(info.RelativeColumns, pos[i].RelativeColumn.Name)
}
}
case model.ActionDropColumn:
colName := job.Args[0].(model.CIStr)
info.DropColumns = append(info.DropColumns, colName)
case model.ActionDropIndex:
indexName := job.Args[0].(model.CIStr)
info.DropIndexes = append(info.DropIndexes, indexName)
case model.ActionAddIndex, model.ActionAddPrimaryKey:
indexName := job.Args[1].(model.CIStr)
indexPartSpecifications := job.Args[2].([]*ast.IndexPartSpecification)
info.AddIndexes = append(info.AddIndexes, indexName)
for _, indexPartSpecification := range indexPartSpecifications {
info.RelativeColumns = append(info.RelativeColumns, indexPartSpecification.Column.Name)
}
case model.ActionRenameIndex:
from := job.Args[0].(model.CIStr)
to := job.Args[1].(model.CIStr)
info.AddIndexes = append(info.AddIndexes, to)
info.DropIndexes = append(info.DropIndexes, from)
case model.ActionModifyColumn:
newCol := *job.Args[0].(**model.ColumnInfo)
oldColName := job.Args[1].(model.CIStr)
pos := job.Args[2].(*ast.ColumnPosition)
if newCol.Name.L != oldColName.L {
info.AddColumns = append(info.AddColumns, newCol.Name)
info.DropColumns = append(info.DropColumns, oldColName)
} else {
info.RelativeColumns = append(info.RelativeColumns, newCol.Name)
}
if pos != nil && pos.Tp == ast.ColumnPositionAfter {
info.RelativeColumns = append(info.RelativeColumns, pos.RelativeColumn.Name)
}
default:
return dbterror.ErrRunMultiSchemaChanges
}
return nil
}

func checkOperateSameColumn(info *model.MultiSchemaInfo) error {
modifyCols := make(map[string]struct{})
modifyIdx := make(map[string]struct{})
for _, col := range info.AddColumns {
name := col.Name.L
if _, ok := modifyCols[name]; ok {
return dbterror.ErrOperateSameColumn.GenWithStackByArgs(name)
}
modifyCols[name] = struct{}{}
}
for _, col := range info.DropColumns {
name := col.Name.L
if _, ok := modifyCols[name]; ok {
return dbterror.ErrOperateSameColumn.GenWithStackByArgs(name)

checkColumns := func(colNames []model.CIStr, addToModifyCols bool) error {
for _, colName := range colNames {
name := colName.L
if _, ok := modifyCols[name]; ok {
return dbterror.ErrOperateSameColumn.GenWithStackByArgs(name)
}
if addToModifyCols {
modifyCols[name] = struct{}{}
}
}
modifyCols[name] = struct{}{}
return nil
}
for _, index := range info.AddIndexes {
idxName := index.Name.L
if _, ok := modifyIdx[idxName]; ok {
return dbterror.ErrOperateSameIndex.GenWithStackByArgs(idxName)
}
modifyIdx[idxName] = struct{}{}
for _, col := range index.Columns {
colName := col.Name.L
if _, ok := modifyCols[colName]; ok {
return dbterror.ErrOperateSameColumn.GenWithStackByArgs(colName)

checkIndexes := func(idxNames []model.CIStr, addToModifyIdx bool) error {
for _, idxName := range idxNames {
name := idxName.L
if _, ok := modifyIdx[name]; ok {
return dbterror.ErrOperateSameColumn.GenWithStackByArgs(name)
}
if addToModifyIdx {
modifyIdx[name] = struct{}{}
}
}
return nil
}
for _, index := range info.DropIndexes {
idxName := index.Name.L
if _, ok := modifyIdx[idxName]; ok {
return dbterror.ErrOperateSameIndex.GenWithStackByArgs(idxName)
}
modifyIdx[idxName] = struct{}{}

if err := checkColumns(info.AddColumns, true); err != nil {
return err
}
if err := checkColumns(info.DropColumns, true); err != nil {
return err
}
if err := checkIndexes(info.AddIndexes, true); err != nil {
return err
}
if err := checkIndexes(info.DropIndexes, true); err != nil {
return err
}

if err := checkColumns(info.RelativeColumns, false); err != nil {
return err
}
return nil
}
Expand Down
77 changes: 63 additions & 14 deletions ddl/multi_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,10 @@ func TestMultiSchemaChangeAddDropColumns(t *testing.T) {
tk.MustExec("alter table t drop column a, drop column b, add column c int default 3, add column d int default 4;")
tk.MustQuery("select * from t;").Check(testkit.Rows("3 4"))

// [a, b] -> [+c after a, +d first, -a, -b] -> [d, c]
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int default 1, b int default 2);")
tk.MustExec("insert into t values ();")
// Note that MariaDB does not support this: Unknown column 'a' in 't'.
// Since TiDB's implementation is snapshot + reasonable cascading, this is supported.
tk.MustExec("alter table t add column c int default 3 after a, add column d int default 4 first, drop column a, drop column b;")
tk.MustQuery("select * from t;").Check(testkit.Rows("4 3"))
tk.MustGetErrCode("alter table t add column c int default 3 after a, add column d int default 4 first, drop column a, drop column b;", errno.ErrUnsupportedDDLOperation)
}

func TestMultiSchemaRenameColumns(t *testing.T) {
Expand Down Expand Up @@ -190,6 +186,60 @@ func TestMultiSchemaRenameColumns(t *testing.T) {
tk.MustGetErrCode("alter table t rename column b to c, add index t1(a, b)", errno.ErrUnsupportedDDLOperation)
}

func TestMultiSchemaModifyColumns(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@global.tidb_enable_change_multi_schema = 1")

// Test modify and drop with same column
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int default 1, b int default 2);")
tk.MustExec("insert into t values ();")
tk.MustGetErrCode("alter table t modify column b double, drop column b", errno.ErrUnsupportedDDLOperation)
}

func TestMultiSchemaAlterColumns(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@global.tidb_enable_change_multi_schema = 1")

// Test alter and drop with same column
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int default 1, b int default 2);")
tk.MustExec("insert into t values ();")
tk.MustGetErrCode("alter table t alter column b set default 3, drop column b", errno.ErrUnsupportedDDLOperation)
}

func TestMultiSchemaChangeColumns(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@global.tidb_enable_change_multi_schema = 1")

// Test change and drop with same column
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int default 1, b int default 2);")
tk.MustExec("insert into t values ();")
tk.MustGetErrCode("alter table t change column b c double, drop column b", errno.ErrUnsupportedDDLOperation)

// Test change and add with same column
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int default 1, b int default 2);")
tk.MustExec("insert into t values ();")
tk.MustGetErrCode("alter table t change column b c double, add column c int", errno.ErrUnsupportedDDLOperation)

// Test add index and rename with same column
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int default 1, b int default 2, index t(a, b));")
tk.MustExec("insert into t values ();")
tk.MustGetErrCode("alter table t change column b c double, add index t1(a, b)", errno.ErrUnsupportedDDLOperation)
}

func TestMultiSchemaChangeAddIndexes(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
Expand Down Expand Up @@ -229,20 +279,19 @@ func TestMultiSchemaChangeDropIndexes(t *testing.T) {
tk.MustExec("create table t (a int, b int, c int, index t(a))")
tk.MustGetErrCode("alter table t drop index t, drop index t", errno.ErrUnsupportedDDLOperation)

tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int, c1 int, c2 int, primary key(id), key i1(c1), key i2(c2));")
tk.MustExec("insert into t values (1, 2, 3);")
tk.MustExec("alter table t drop index i1, drop index i2;")
tk.MustGetErrCode("select * from t use index(i1);", errno.ErrKeyDoesNotExist)
tk.MustGetErrCode("select * from t use index(i2);", errno.ErrKeyDoesNotExist)

// Test drop index with drop column.
/*
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int default 1, b int default 2, c int default 3, index t(a))")
tk.MustExec("insert into t values ();")
tk.MustExec("alter table t drop index t, drop column a")
tk.MustGetErrCode("select * from t force index(t)", errno.ErrKeyDoesNotExist)
*/
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int default 1, b int default 2, c int default 3, index t(a))")
tk.MustExec("insert into t values ();")
tk.MustExec("alter table t drop index t, drop column a")
tk.MustGetErrCode("select * from t force index(t)", errno.ErrKeyDoesNotExist)
}

func TestMultiSchemaChangeAddDropIndexes(t *testing.T) {
Expand All @@ -255,12 +304,12 @@ func TestMultiSchemaChangeAddDropIndexes(t *testing.T) {
// Test add and drop same index.
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int, c int, index t(a))")
tk.MustGetErrCode("alter table t drop index t, add index t(b)", errno.ErrUnsupportedDDLOperation)
tk.MustGetErrCode("alter table t drop index t, add index t(b)", errno.ErrDupKeyName)

// Test add and drop same index.
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int, c int, index t(a))")
tk.MustGetErrCode("alter table t add index t1(b), drop index t1", errno.ErrUnsupportedDDLOperation)
tk.MustGetErrCode("alter table t add index t1(b), drop index t1", errno.ErrCantDropFieldOrKey)
}

func TestMultiSchemaRenameIndexes(t *testing.T) {
Expand Down
Loading

0 comments on commit 67eeacc

Please sign in to comment.