Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: support add indexes for multi-schema change #35989

Merged
merged 13 commits into from
Jul 7, 2022
20 changes: 2 additions & 18 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3078,7 +3078,8 @@ func checkMultiSpecs(sctx sessionctx.Context, specs []*ast.AlterTableSpec) error
func allSupported(specs []*ast.AlterTableSpec) bool {
for _, s := range specs {
switch s.Tp {
case ast.AlterTableAddColumns, ast.AlterTableDropColumn, ast.AlterTableDropIndex, ast.AlterTableDropPrimaryKey:
case ast.AlterTableAddColumns, ast.AlterTableDropColumn, ast.AlterTableDropIndex, ast.AlterTableDropPrimaryKey,
ast.AlterTableAddConstraint:
default:
return false
}
Expand Down Expand Up @@ -3115,23 +3116,6 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast
return err
}

if len(validSpecs) > 1 {
useMultiSchemaChange := false
switch validSpecs[0].Tp {
case ast.AlterTableAddColumns, ast.AlterTableDropColumn,
ast.AlterTableDropPrimaryKey, ast.AlterTableDropIndex:
useMultiSchemaChange = true
default:
return dbterror.ErrRunMultiSchemaChanges
}
if err != nil {
return errors.Trace(err)
}
if !useMultiSchemaChange {
return nil
}
}

if len(validSpecs) > 1 {
sctx.GetSessionVars().StmtCtx.MultiSchemaInfo = model.NewMultiSchemaInfo()
}
Expand Down
18 changes: 13 additions & 5 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,17 +414,25 @@ func (w *worker) updateDDLJob(t *meta.Meta, job *model.Job, meetErr bool) error
failpoint.Return(kv.ErrEntryTooLarge)
}
})
updateRawArgs := true
// If there is an error when running job and the RawArgs hasn't been decoded by DecodeArgs,
// so we shouldn't replace RawArgs with the marshaling Args.
if meetErr && (job.RawArgs != nil && job.Args == nil) {
updateRawArgs := needUpdateRawArgs(job, meetErr)
if !updateRawArgs {
logutil.Logger(w.logCtx).Info("[ddl] meet something wrong before update DDL job, shouldn't update raw args",
zap.String("job", job.String()))
updateRawArgs = false
}
return errors.Trace(t.UpdateDDLJob(0, job, updateRawArgs))
}

func needUpdateRawArgs(job *model.Job, meetErr bool) bool {
// If there is an error when running job and the RawArgs hasn't been decoded by DecodeArgs,
// we shouldn't replace RawArgs with the marshaling Args.
if meetErr && job.RawArgs != nil && job.Args == nil {
// However, for multi-schema change, the args of the parent job is always nil.
// Since Job.Encode() can handle the sub-jobs properly, we can safely update the raw args.
return job.MultiSchemaInfo != nil
}
return true
}

func (w *worker) deleteRange(ctx context.Context, job *model.Job) error {
var err error
if job.Version <= currentVersion {
Expand Down
55 changes: 44 additions & 11 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,10 +396,31 @@ func checkPrimaryKeyNotNull(d *ddlCtx, w *worker, sqlMode mysql.SQLMode, t *meta
return nil, err
}

func updateHiddenColumns(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, state model.SchemaState) {
// moveAndUpdateHiddenColumnsToPublic updates the hidden columns to public, and
// moves the hidden columns to proper offsets, so that Table.Columns' states meet the assumption of
// [public, public, ..., public, non-public, non-public, ..., non-public].
func moveAndUpdateHiddenColumnsToPublic(tblInfo *model.TableInfo, idxInfo *model.IndexInfo) {
hiddenColOffset := make(map[int]struct{}, 0)
for _, col := range idxInfo.Columns {
if tblInfo.Columns[col.Offset].Hidden {
tblInfo.Columns[col.Offset].State = state
hiddenColOffset[col.Offset] = struct{}{}
}
}
if len(hiddenColOffset) == 0 {
return
}
// Find the first non-public column.
firstNonPublicPos := len(tblInfo.Columns) - 1
for i, c := range tblInfo.Columns {
if c.State != model.StatePublic {
firstNonPublicPos = i
break
}
}
for _, col := range idxInfo.Columns {
tblInfo.Columns[col.Offset].State = model.StatePublic
if _, needMove := hiddenColOffset[col.Offset]; needMove {
tblInfo.MoveColumnInfo(col.Offset, firstNonPublicPos)
tangenta marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down Expand Up @@ -469,13 +490,8 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo

if indexInfo == nil {
if len(hiddenCols) > 0 {
pos := &ast.ColumnPosition{Tp: ast.ColumnPositionNone}
for _, hiddenCol := range hiddenCols {
_, _, _, err = createColumnInfoWithPosCheck(tblInfo, hiddenCol, pos)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
initAndAddColumnToTable(tblInfo, hiddenCol)
}
}
if err = checkAddColumnTooManyColumns(len(tblInfo.Columns)); err != nil {
Expand Down Expand Up @@ -532,7 +548,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
case model.StateNone:
// none -> delete only
indexInfo.State = model.StateDeleteOnly
updateHiddenColumns(tblInfo, indexInfo, model.StatePublic)
moveAndUpdateHiddenColumnsToPublic(tblInfo, indexInfo)
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != indexInfo.State)
if err != nil {
return ver, err
Expand Down Expand Up @@ -573,19 +589,23 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
}

var done bool
done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo)
if job.MultiSchemaInfo != nil {
done, ver, err = doReorgWorkForCreateIndexMultiSchema(w, d, t, job, tbl, indexInfo)
} else {
done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo)
}
if !done {
return ver, err
}

indexInfo.State = model.StatePublic
// Set column index flag.
addIndexColumnFlag(tblInfo, indexInfo)
if isPK {
if err = updateColsNull2NotNull(tblInfo, indexInfo); err != nil {
return ver, errors.Trace(err)
}
}
indexInfo.State = model.StatePublic
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != indexInfo.State)
if err != nil {
return ver, errors.Trace(err)
Expand All @@ -599,6 +619,19 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
return ver, errors.Trace(err)
}

func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) {
if job.MultiSchemaInfo.Revertible {
done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo)
if done {
job.MarkNonRevertible()
}
// We need another round to wait for all the others sub-jobs to finish.
return false, ver, err
}
return true, ver, err
}

func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) {
elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}}
Expand Down
36 changes: 35 additions & 1 deletion ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,12 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
}
proxyJob := sub.ToProxyJob(job)
ver, err = w.runDDLJob(d, t, &proxyJob)
err = handleRollbackException(err, proxyJob.Error)
if err != nil {
return ver, err
}
sub.FromProxyJob(&proxyJob)
return ver, err
return ver, nil
}
// The last rollback/cancelling sub-job is done.
job.State = model.JobStateRollbackDone
Expand Down Expand Up @@ -154,6 +158,22 @@ func handleRevertibleException(job *model.Job, subJob *model.SubJob, err *terror
}
}

func handleRollbackException(runJobErr error, proxyJobErr *terror.Error) error {
if runJobErr != nil {
// The physical errors are not recoverable during rolling back.
// We keep retrying it.
return runJobErr
}
if proxyJobErr != nil {
if proxyJobErr.Equal(dbterror.ErrCancelledDDLJob) {
// A cancelled DDL error is normal during rolling back.
return nil
}
return proxyJobErr
}
return nil
}

func appendToSubJobs(m *model.MultiSchemaInfo, job *model.Job) error {
err := fillMultiSchemaInfo(m, job)
if err != nil {
Expand Down Expand Up @@ -189,6 +209,20 @@ func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *model.Job) (err error
case model.ActionDropIndex, model.ActionDropPrimaryKey:
indexName := job.Args[0].(model.CIStr)
info.DropIndexes = append(info.DropIndexes, indexName)
case model.ActionAddIndex, model.ActionAddPrimaryKey:
indexName := job.Args[1].(model.CIStr)
indexPartSpecifications := job.Args[2].([]*ast.IndexPartSpecification)
info.AddIndexes = append(info.AddIndexes, indexName)
for _, indexPartSpecification := range indexPartSpecifications {
info.RelativeColumns = append(info.RelativeColumns, indexPartSpecification.Column.Name)
}
if hiddenCols, ok := job.Args[4].([]*model.ColumnInfo); ok {
for _, c := range hiddenCols {
for depColName := range c.Dependences {
info.RelativeColumns = append(info.RelativeColumns, model.NewCIStr(depColName))
}
}
}
default:
return dbterror.ErrRunMultiSchemaChanges
}
Expand Down
151 changes: 151 additions & 0 deletions ddl/multi_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -289,6 +290,86 @@ func TestMultiSchemaChangeAddDropColumns(t *testing.T) {
tk.MustGetErrCode("alter table t add column c int default 3 after a, add column d int default 4 first, drop column a, drop column b;", errno.ErrUnsupportedDDLOperation)
}

func TestMultiSchemaChangeAddIndexes(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

// Test add multiple indexes with same column.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int);")
tk.MustExec("insert into t values (1, 2, 3);")
tk.MustExec("alter table t add index t(a, b), add index t1(a);")
tk.MustExec("alter table t add index t2(a), add index t3(a, b);")
tk.MustQuery("select * from t use index (t, t1, t2, t3);").Check(testkit.Rows("1 2 3"))
tk.MustExec("admin check table t;")

// Test add multiple indexes with same name.
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int, c int)")
tk.MustGetErrCode("alter table t add index t(a), add index t(b)", errno.ErrUnsupportedDDLOperation)
tangenta marked this conversation as resolved.
Show resolved Hide resolved
tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ ))

// Test add indexes with drop column.
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int, c int)")
tk.MustGetErrCode("alter table t add index t(a), drop column a", errno.ErrUnsupportedDDLOperation)
tk.MustGetErrCode("alter table t add index t(a, b), drop column a", errno.ErrUnsupportedDDLOperation)
tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ ))

// Test add index failed.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int);")
tk.MustExec("insert into t values (1, 1, 1), (2, 2, 2), (3, 3, 1);")
tk.MustGetErrCode("alter table t add unique index i1(a), add unique index i2(a, b), add unique index i3(c);",
errno.ErrDupEntry)
tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ ))
tk.MustExec("alter table t add index i1(a), add index i2(a, b), add index i3(c);")
tangenta marked this conversation as resolved.
Show resolved Hide resolved
}

func TestMultiSchemaChangeAddIndexesCancelled(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
originHook := dom.DDL().GetHook()

// Test cancel successfully.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int);")
tk.MustExec("insert into t values (1, 2, 3);")
cancelHook := newCancelJobHook(store, dom, func(job *model.Job) bool {
// Cancel the job when index 't2' is in write-reorg.
return job.MultiSchemaInfo.SubJobs[2].SchemaState == model.StateWriteReorganization
})
dom.DDL().SetHook(cancelHook)
tk.MustGetErrCode("alter table t "+
"add index t(a, b), add index t1(a), "+
"add index t2(a), add index t3(a, b);", errno.ErrCancelledDDLJob)
dom.DDL().SetHook(originHook)
cancelHook.MustCancelDone(t)
tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ ))
tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3"))
tk.MustExec("admin check table t;")

// Test cancel failed when some sub-jobs have been finished.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int);")
tk.MustExec("insert into t values (1, 2, 3);")
cancelHook = newCancelJobHook(store, dom, func(job *model.Job) bool {
// Cancel the job when index 't1' is in public.
return job.MultiSchemaInfo.SubJobs[1].SchemaState == model.StatePublic
})
dom.DDL().SetHook(cancelHook)
tk.MustExec("alter table t add index t(a, b), add index t1(a), " +
"add index t2(a), add index t3(a, b);")
dom.DDL().SetHook(originHook)
cancelHook.MustCancelFailed(t)
tk.MustQuery("select * from t use index(t, t1, t2, t3);").Check(testkit.Rows("1 2 3"))
tk.MustExec("admin check table t;")
}

func TestMultiSchemaChangeDropIndexes(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
Expand Down Expand Up @@ -372,6 +453,76 @@ func TestMultiSchemaChangeDropIndexesParallel(t *testing.T) {
})
}

func TestMultiSchemaChangeAddDropIndexes(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

// Test add and drop same index.
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int, c int, index t(a))")
tk.MustGetErrCode("alter table t drop index t, add index t(b)", errno.ErrDupKeyName)

// Test add and drop same index.
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int, c int, index t(a))")
tk.MustGetErrCode("alter table t add index t1(b), drop index t1", errno.ErrCantDropFieldOrKey)

tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int, index (a), index(b), index(c));")
tk.MustExec("insert into t values (1, 2, 3);")
tk.MustExec("alter table t add index aa(a), drop index a, add index cc(c), drop index b, drop index c, add index bb(b);")
tk.MustQuery("select * from t use index(aa, bb, cc);").Check(testkit.Rows("1 2 3"))
tk.MustGetErrCode("select * from t use index(a);", errno.ErrKeyDoesNotExist)
tk.MustGetErrCode("select * from t use index(b);", errno.ErrKeyDoesNotExist)
tk.MustGetErrCode("select * from t use index(c);", errno.ErrKeyDoesNotExist)
tk.MustExec("admin check table t;")
}

func TestMultiSchemaChangeWithExpressionIndex(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("create table t (a int, b int);")
tk.MustExec("insert into t values (1, 2), (2, 1);")
tk.MustGetErrCode("alter table t drop column a, add unique index idx((a + b));", errno.ErrUnsupportedDDLOperation)
tk.MustGetErrCode("alter table t add column c int, change column a d bigint, add index idx((a + a));", errno.ErrUnsupportedDDLOperation)
tk.MustGetErrCode("alter table t add column c int default 10, add index idx1((a + b)), add unique index idx2((a + b));",
errno.ErrDupEntry)
tk.MustQuery("select * from t;").Check(testkit.Rows("1 2", "2 1"))

originHook := dom.DDL().GetHook()
hook := &ddl.TestDDLCallback{Do: dom}
var checkErr error
hook.OnJobRunBeforeExported = func(job *model.Job) {
if checkErr != nil {
return
}
assert.Equal(t, model.ActionMultiSchemaChange, job.Type)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If checkErr isn't nil, we needn't do next logic, otherwise, the value of checkErr will be overwritten.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. Thanks!

if job.MultiSchemaInfo.SubJobs[1].SchemaState == model.StateWriteOnly {
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test;")
_, checkErr = tk2.Exec("update t set a = 3 where a = 1;")
if checkErr != nil {
return
}
_, checkErr = tk2.Exec("insert into t values (10, 10);")
}
}
dom.DDL().SetHook(hook)
tk.MustExec("alter table t add column c int default 10, add index idx1((a + b)), add unique index idx2((a + b));")
require.NoError(t, checkErr)
dom.DDL().SetHook(originHook)

tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int);")
tk.MustExec("insert into t values (1, 2), (2, 1);")
tk.MustExec("alter table t add column c int default 10, add index idx1((a + b)), add unique index idx2((a*10 + b));")
tk.MustQuery("select * from t use index(idx1, idx2);").Check(testkit.Rows("1 2 10", "2 1 10"))
}

type cancelOnceHook struct {
store kv.Storage
triggered bool
Expand Down
Loading