Skip to content

Commit

Permalink
ddl: support add indexes for multi-schema change (#35989)
Browse files Browse the repository at this point in the history
ref #14766
  • Loading branch information
tangenta authored Jul 7, 2022
1 parent f3eb0f5 commit a2cc6b8
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 41 deletions.
20 changes: 2 additions & 18 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3078,7 +3078,8 @@ func checkMultiSpecs(sctx sessionctx.Context, specs []*ast.AlterTableSpec) error
func allSupported(specs []*ast.AlterTableSpec) bool {
for _, s := range specs {
switch s.Tp {
case ast.AlterTableAddColumns, ast.AlterTableDropColumn, ast.AlterTableDropIndex, ast.AlterTableDropPrimaryKey:
case ast.AlterTableAddColumns, ast.AlterTableDropColumn, ast.AlterTableDropIndex, ast.AlterTableDropPrimaryKey,
ast.AlterTableAddConstraint:
default:
return false
}
Expand Down Expand Up @@ -3115,23 +3116,6 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast
return err
}

if len(validSpecs) > 1 {
useMultiSchemaChange := false
switch validSpecs[0].Tp {
case ast.AlterTableAddColumns, ast.AlterTableDropColumn,
ast.AlterTableDropPrimaryKey, ast.AlterTableDropIndex:
useMultiSchemaChange = true
default:
return dbterror.ErrRunMultiSchemaChanges
}
if err != nil {
return errors.Trace(err)
}
if !useMultiSchemaChange {
return nil
}
}

if len(validSpecs) > 1 {
sctx.GetSessionVars().StmtCtx.MultiSchemaInfo = model.NewMultiSchemaInfo()
}
Expand Down
18 changes: 13 additions & 5 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,17 +414,25 @@ func (w *worker) updateDDLJob(t *meta.Meta, job *model.Job, meetErr bool) error
failpoint.Return(kv.ErrEntryTooLarge)
}
})
updateRawArgs := true
// If there is an error when running job and the RawArgs hasn't been decoded by DecodeArgs,
// so we shouldn't replace RawArgs with the marshaling Args.
if meetErr && (job.RawArgs != nil && job.Args == nil) {
updateRawArgs := needUpdateRawArgs(job, meetErr)
if !updateRawArgs {
logutil.Logger(w.logCtx).Info("[ddl] meet something wrong before update DDL job, shouldn't update raw args",
zap.String("job", job.String()))
updateRawArgs = false
}
return errors.Trace(t.UpdateDDLJob(0, job, updateRawArgs))
}

func needUpdateRawArgs(job *model.Job, meetErr bool) bool {
// If there is an error when running job and the RawArgs hasn't been decoded by DecodeArgs,
// we shouldn't replace RawArgs with the marshaling Args.
if meetErr && job.RawArgs != nil && job.Args == nil {
// However, for multi-schema change, the args of the parent job is always nil.
// Since Job.Encode() can handle the sub-jobs properly, we can safely update the raw args.
return job.MultiSchemaInfo != nil
}
return true
}

func (w *worker) deleteRange(ctx context.Context, job *model.Job) error {
var err error
if job.Version <= currentVersion {
Expand Down
55 changes: 44 additions & 11 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,10 +396,31 @@ func checkPrimaryKeyNotNull(d *ddlCtx, w *worker, sqlMode mysql.SQLMode, t *meta
return nil, err
}

func updateHiddenColumns(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, state model.SchemaState) {
// moveAndUpdateHiddenColumnsToPublic updates the hidden columns to public, and
// moves the hidden columns to proper offsets, so that Table.Columns' states meet the assumption of
// [public, public, ..., public, non-public, non-public, ..., non-public].
func moveAndUpdateHiddenColumnsToPublic(tblInfo *model.TableInfo, idxInfo *model.IndexInfo) {
hiddenColOffset := make(map[int]struct{}, 0)
for _, col := range idxInfo.Columns {
if tblInfo.Columns[col.Offset].Hidden {
tblInfo.Columns[col.Offset].State = state
hiddenColOffset[col.Offset] = struct{}{}
}
}
if len(hiddenColOffset) == 0 {
return
}
// Find the first non-public column.
firstNonPublicPos := len(tblInfo.Columns) - 1
for i, c := range tblInfo.Columns {
if c.State != model.StatePublic {
firstNonPublicPos = i
break
}
}
for _, col := range idxInfo.Columns {
tblInfo.Columns[col.Offset].State = model.StatePublic
if _, needMove := hiddenColOffset[col.Offset]; needMove {
tblInfo.MoveColumnInfo(col.Offset, firstNonPublicPos)
}
}
}
Expand Down Expand Up @@ -469,13 +490,8 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo

if indexInfo == nil {
if len(hiddenCols) > 0 {
pos := &ast.ColumnPosition{Tp: ast.ColumnPositionNone}
for _, hiddenCol := range hiddenCols {
_, _, _, err = createColumnInfoWithPosCheck(tblInfo, hiddenCol, pos)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
initAndAddColumnToTable(tblInfo, hiddenCol)
}
}
if err = checkAddColumnTooManyColumns(len(tblInfo.Columns)); err != nil {
Expand Down Expand Up @@ -532,7 +548,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
case model.StateNone:
// none -> delete only
indexInfo.State = model.StateDeleteOnly
updateHiddenColumns(tblInfo, indexInfo, model.StatePublic)
moveAndUpdateHiddenColumnsToPublic(tblInfo, indexInfo)
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != indexInfo.State)
if err != nil {
return ver, err
Expand Down Expand Up @@ -573,19 +589,23 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
}

var done bool
done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo)
if job.MultiSchemaInfo != nil {
done, ver, err = doReorgWorkForCreateIndexMultiSchema(w, d, t, job, tbl, indexInfo)
} else {
done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo)
}
if !done {
return ver, err
}

indexInfo.State = model.StatePublic
// Set column index flag.
addIndexColumnFlag(tblInfo, indexInfo)
if isPK {
if err = updateColsNull2NotNull(tblInfo, indexInfo); err != nil {
return ver, errors.Trace(err)
}
}
indexInfo.State = model.StatePublic
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != indexInfo.State)
if err != nil {
return ver, errors.Trace(err)
Expand All @@ -599,6 +619,19 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
return ver, errors.Trace(err)
}

func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) {
if job.MultiSchemaInfo.Revertible {
done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo)
if done {
job.MarkNonRevertible()
}
// We need another round to wait for all the others sub-jobs to finish.
return false, ver, err
}
return true, ver, err
}

func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) {
elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}}
Expand Down
36 changes: 35 additions & 1 deletion ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,12 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
}
proxyJob := sub.ToProxyJob(job)
ver, err = w.runDDLJob(d, t, &proxyJob)
err = handleRollbackException(err, proxyJob.Error)
if err != nil {
return ver, err
}
sub.FromProxyJob(&proxyJob)
return ver, err
return ver, nil
}
// The last rollback/cancelling sub-job is done.
job.State = model.JobStateRollbackDone
Expand Down Expand Up @@ -154,6 +158,22 @@ func handleRevertibleException(job *model.Job, subJob *model.SubJob, err *terror
}
}

func handleRollbackException(runJobErr error, proxyJobErr *terror.Error) error {
if runJobErr != nil {
// The physical errors are not recoverable during rolling back.
// We keep retrying it.
return runJobErr
}
if proxyJobErr != nil {
if proxyJobErr.Equal(dbterror.ErrCancelledDDLJob) {
// A cancelled DDL error is normal during rolling back.
return nil
}
return proxyJobErr
}
return nil
}

func appendToSubJobs(m *model.MultiSchemaInfo, job *model.Job) error {
err := fillMultiSchemaInfo(m, job)
if err != nil {
Expand Down Expand Up @@ -189,6 +209,20 @@ func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *model.Job) (err error
case model.ActionDropIndex, model.ActionDropPrimaryKey:
indexName := job.Args[0].(model.CIStr)
info.DropIndexes = append(info.DropIndexes, indexName)
case model.ActionAddIndex, model.ActionAddPrimaryKey:
indexName := job.Args[1].(model.CIStr)
indexPartSpecifications := job.Args[2].([]*ast.IndexPartSpecification)
info.AddIndexes = append(info.AddIndexes, indexName)
for _, indexPartSpecification := range indexPartSpecifications {
info.RelativeColumns = append(info.RelativeColumns, indexPartSpecification.Column.Name)
}
if hiddenCols, ok := job.Args[4].([]*model.ColumnInfo); ok {
for _, c := range hiddenCols {
for depColName := range c.Dependences {
info.RelativeColumns = append(info.RelativeColumns, model.NewCIStr(depColName))
}
}
}
default:
return dbterror.ErrRunMultiSchemaChanges
}
Expand Down
151 changes: 151 additions & 0 deletions ddl/multi_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -289,6 +290,86 @@ func TestMultiSchemaChangeAddDropColumns(t *testing.T) {
tk.MustGetErrCode("alter table t add column c int default 3 after a, add column d int default 4 first, drop column a, drop column b;", errno.ErrUnsupportedDDLOperation)
}

func TestMultiSchemaChangeAddIndexes(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

// Test add multiple indexes with same column.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int);")
tk.MustExec("insert into t values (1, 2, 3);")
tk.MustExec("alter table t add index t(a, b), add index t1(a);")
tk.MustExec("alter table t add index t2(a), add index t3(a, b);")
tk.MustQuery("select * from t use index (t, t1, t2, t3);").Check(testkit.Rows("1 2 3"))
tk.MustExec("admin check table t;")

// Test add multiple indexes with same name.
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int, c int)")
tk.MustGetErrCode("alter table t add index t(a), add index t(b)", errno.ErrUnsupportedDDLOperation)
tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ ))

// Test add indexes with drop column.
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int, c int)")
tk.MustGetErrCode("alter table t add index t(a), drop column a", errno.ErrUnsupportedDDLOperation)
tk.MustGetErrCode("alter table t add index t(a, b), drop column a", errno.ErrUnsupportedDDLOperation)
tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ ))

// Test add index failed.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int);")
tk.MustExec("insert into t values (1, 1, 1), (2, 2, 2), (3, 3, 1);")
tk.MustGetErrCode("alter table t add unique index i1(a), add unique index i2(a, b), add unique index i3(c);",
errno.ErrDupEntry)
tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ ))
tk.MustExec("alter table t add index i1(a), add index i2(a, b), add index i3(c);")
}

func TestMultiSchemaChangeAddIndexesCancelled(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
originHook := dom.DDL().GetHook()

// Test cancel successfully.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int);")
tk.MustExec("insert into t values (1, 2, 3);")
cancelHook := newCancelJobHook(store, dom, func(job *model.Job) bool {
// Cancel the job when index 't2' is in write-reorg.
return job.MultiSchemaInfo.SubJobs[2].SchemaState == model.StateWriteReorganization
})
dom.DDL().SetHook(cancelHook)
tk.MustGetErrCode("alter table t "+
"add index t(a, b), add index t1(a), "+
"add index t2(a), add index t3(a, b);", errno.ErrCancelledDDLJob)
dom.DDL().SetHook(originHook)
cancelHook.MustCancelDone(t)
tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ ))
tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3"))
tk.MustExec("admin check table t;")

// Test cancel failed when some sub-jobs have been finished.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int);")
tk.MustExec("insert into t values (1, 2, 3);")
cancelHook = newCancelJobHook(store, dom, func(job *model.Job) bool {
// Cancel the job when index 't1' is in public.
return job.MultiSchemaInfo.SubJobs[1].SchemaState == model.StatePublic
})
dom.DDL().SetHook(cancelHook)
tk.MustExec("alter table t add index t(a, b), add index t1(a), " +
"add index t2(a), add index t3(a, b);")
dom.DDL().SetHook(originHook)
cancelHook.MustCancelFailed(t)
tk.MustQuery("select * from t use index(t, t1, t2, t3);").Check(testkit.Rows("1 2 3"))
tk.MustExec("admin check table t;")
}

func TestMultiSchemaChangeDropIndexes(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
Expand Down Expand Up @@ -372,6 +453,76 @@ func TestMultiSchemaChangeDropIndexesParallel(t *testing.T) {
})
}

func TestMultiSchemaChangeAddDropIndexes(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

// Test add and drop same index.
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int, c int, index t(a))")
tk.MustGetErrCode("alter table t drop index t, add index t(b)", errno.ErrDupKeyName)

// Test add and drop same index.
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int, c int, index t(a))")
tk.MustGetErrCode("alter table t add index t1(b), drop index t1", errno.ErrCantDropFieldOrKey)

tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int, index (a), index(b), index(c));")
tk.MustExec("insert into t values (1, 2, 3);")
tk.MustExec("alter table t add index aa(a), drop index a, add index cc(c), drop index b, drop index c, add index bb(b);")
tk.MustQuery("select * from t use index(aa, bb, cc);").Check(testkit.Rows("1 2 3"))
tk.MustGetErrCode("select * from t use index(a);", errno.ErrKeyDoesNotExist)
tk.MustGetErrCode("select * from t use index(b);", errno.ErrKeyDoesNotExist)
tk.MustGetErrCode("select * from t use index(c);", errno.ErrKeyDoesNotExist)
tk.MustExec("admin check table t;")
}

func TestMultiSchemaChangeWithExpressionIndex(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("create table t (a int, b int);")
tk.MustExec("insert into t values (1, 2), (2, 1);")
tk.MustGetErrCode("alter table t drop column a, add unique index idx((a + b));", errno.ErrUnsupportedDDLOperation)
tk.MustGetErrCode("alter table t add column c int, change column a d bigint, add index idx((a + a));", errno.ErrUnsupportedDDLOperation)
tk.MustGetErrCode("alter table t add column c int default 10, add index idx1((a + b)), add unique index idx2((a + b));",
errno.ErrDupEntry)
tk.MustQuery("select * from t;").Check(testkit.Rows("1 2", "2 1"))

originHook := dom.DDL().GetHook()
hook := &ddl.TestDDLCallback{Do: dom}
var checkErr error
hook.OnJobRunBeforeExported = func(job *model.Job) {
if checkErr != nil {
return
}
assert.Equal(t, model.ActionMultiSchemaChange, job.Type)
if job.MultiSchemaInfo.SubJobs[1].SchemaState == model.StateWriteOnly {
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test;")
_, checkErr = tk2.Exec("update t set a = 3 where a = 1;")
if checkErr != nil {
return
}
_, checkErr = tk2.Exec("insert into t values (10, 10);")
}
}
dom.DDL().SetHook(hook)
tk.MustExec("alter table t add column c int default 10, add index idx1((a + b)), add unique index idx2((a + b));")
require.NoError(t, checkErr)
dom.DDL().SetHook(originHook)

tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int);")
tk.MustExec("insert into t values (1, 2), (2, 1);")
tk.MustExec("alter table t add column c int default 10, add index idx1((a + b)), add unique index idx2((a*10 + b));")
tk.MustQuery("select * from t use index(idx1, idx2);").Check(testkit.Rows("1 2 10", "2 1 10"))
}

type cancelOnceHook struct {
store kv.Storage
triggered bool
Expand Down
Loading

0 comments on commit a2cc6b8

Please sign in to comment.