From f774e5ff8592da209164a39fcfbb2163f41ec44c Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 6 Jul 2022 17:31:34 +0800 Subject: [PATCH 1/8] ddl: support add indexes for multi-schema change --- ddl/ddl_api.go | 20 +----- ddl/ddl_worker.go | 21 +++++-- ddl/index.go | 57 +++++++++++++---- ddl/multi_schema_change.go | 36 ++++++++++- ddl/multi_schema_change_test.go | 105 ++++++++++++++++++++++++++++++++ ddl/rollingback.go | 24 ++++++-- ddl/sanity_check.go | 8 +++ parser/model/ddl.go | 8 +++ 8 files changed, 238 insertions(+), 41 deletions(-) diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 9082944257c04..10e36a0d10a11 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -3078,7 +3078,8 @@ func checkMultiSpecs(sctx sessionctx.Context, specs []*ast.AlterTableSpec) error func allSupported(specs []*ast.AlterTableSpec) bool { for _, s := range specs { switch s.Tp { - case ast.AlterTableAddColumns, ast.AlterTableDropColumn, ast.AlterTableDropIndex, ast.AlterTableDropPrimaryKey: + case ast.AlterTableAddColumns, ast.AlterTableDropColumn, ast.AlterTableDropIndex, ast.AlterTableDropPrimaryKey, + ast.AlterTableAddConstraint: default: return false } @@ -3115,23 +3116,6 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast return err } - if len(validSpecs) > 1 { - useMultiSchemaChange := false - switch validSpecs[0].Tp { - case ast.AlterTableAddColumns, ast.AlterTableDropColumn, - ast.AlterTableDropPrimaryKey, ast.AlterTableDropIndex: - useMultiSchemaChange = true - default: - return dbterror.ErrRunMultiSchemaChanges - } - if err != nil { - return errors.Trace(err) - } - if !useMultiSchemaChange { - return nil - } - } - if len(validSpecs) > 1 { sctx.GetSessionVars().StmtCtx.MultiSchemaInfo = model.NewMultiSchemaInfo() } diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 2e1d50435189f..b0e31586426a6 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -414,17 +414,28 @@ func (w *worker) updateDDLJob(t *meta.Meta, job *model.Job, meetErr bool) error failpoint.Return(kv.ErrEntryTooLarge) } }) - updateRawArgs := true - // If there is an error when running job and the RawArgs hasn't been decoded by DecodeArgs, - // so we shouldn't replace RawArgs with the marshaling Args. - if meetErr && (job.RawArgs != nil && job.Args == nil) { + updateRawArgs := needToUpdateRawArgs(job, meetErr) + if !updateRawArgs { logutil.Logger(w.logCtx).Info("[ddl] meet something wrong before update DDL job, shouldn't update raw args", zap.String("job", job.String())) - updateRawArgs = false } return errors.Trace(t.UpdateDDLJob(0, job, updateRawArgs)) } +func needToUpdateRawArgs(job *model.Job, meetErr bool) bool { + if meetErr && job.RawArgs != nil && job.Args == nil { + // If there is an error when running job and the RawArgs hasn't been decoded by DecodeArgs, + // so we shouldn't replace RawArgs with the marshaling Args. + if job.MultiSchemaInfo != nil { + // However, for multi-schema change, the args of the parent job is always nil. + // Since Job.Encode() can handle the sub-jobs properly, we can safely update the raw args. + return true + } + return false + } + return true +} + func (w *worker) deleteRange(ctx context.Context, job *model.Job) error { var err error if job.Version <= currentVersion { diff --git a/ddl/index.go b/ddl/index.go index 4016cee19f59a..797195b4cb66e 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -396,10 +396,31 @@ func checkPrimaryKeyNotNull(d *ddlCtx, w *worker, sqlMode mysql.SQLMode, t *meta return nil, err } -func updateHiddenColumns(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, state model.SchemaState) { +// moveAndUpdateHiddenColumnsToPublic updates the hidden columns to public, and +// moves the hidden columns to proper offsets, so that Table.Columns' states meet the assumption of +// [public, public, ..., public, non-public, non-public, ..., non-public]. +func moveAndUpdateHiddenColumnsToPublic(tblInfo *model.TableInfo, idxInfo *model.IndexInfo) { + hiddenColOffset := make(map[int]struct{}, 0) for _, col := range idxInfo.Columns { if tblInfo.Columns[col.Offset].Hidden { - tblInfo.Columns[col.Offset].State = state + hiddenColOffset[col.Offset] = struct{}{} + } + } + if len(hiddenColOffset) == 0 { + return + } + // Find the first non-public column. + firstNonPublicPos := len(tblInfo.Columns) - 1 + for i, c := range tblInfo.Columns { + if c.State != model.StatePublic { + firstNonPublicPos = i + break + } + } + for _, col := range idxInfo.Columns { + tblInfo.Columns[col.Offset].State = model.StatePublic + if _, needMove := hiddenColOffset[col.Offset]; needMove { + tblInfo.MoveColumnInfo(col.Offset, firstNonPublicPos) } } } @@ -469,12 +490,9 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo if indexInfo == nil { if len(hiddenCols) > 0 { - pos := &ast.ColumnPosition{Tp: ast.ColumnPositionNone} - for _, hiddenCol := range hiddenCols { - _, _, _, err = createColumnInfoWithPosCheck(tblInfo, hiddenCol, pos) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) + if len(hiddenCols) > 0 { + for _, hiddenCol := range hiddenCols { + initAndAddColumnToTable(tblInfo, hiddenCol) } } } @@ -532,7 +550,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo case model.StateNone: // none -> delete only indexInfo.State = model.StateDeleteOnly - updateHiddenColumns(tblInfo, indexInfo, model.StatePublic) + moveAndUpdateHiddenColumnsToPublic(tblInfo, indexInfo) ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != indexInfo.State) if err != nil { return ver, err @@ -573,12 +591,15 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo } var done bool - done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo) + if job.MultiSchemaInfo != nil { + done, ver, err = doReorgWorkForCreateIndexMultiSchema(w, d, t, job, tbl, indexInfo) + } else { + done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo) + } if !done { return ver, err } - indexInfo.State = model.StatePublic // Set column index flag. addIndexColumnFlag(tblInfo, indexInfo) if isPK { @@ -586,6 +607,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo return ver, errors.Trace(err) } } + indexInfo.State = model.StatePublic ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != indexInfo.State) if err != nil { return ver, errors.Trace(err) @@ -599,6 +621,19 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo return ver, errors.Trace(err) } +func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, + tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) { + if job.MultiSchemaInfo.Revertible { + done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo) + if done { + job.MarkNonRevertible() + done = false // We need another round to wait for all the others sub-jobs to finish. + } + return done, ver, err + } + return true, ver, err +} + func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) { elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}} diff --git a/ddl/multi_schema_change.go b/ddl/multi_schema_change.go index 20e62d89ed22b..6362185dde9d3 100644 --- a/ddl/multi_schema_change.go +++ b/ddl/multi_schema_change.go @@ -69,8 +69,12 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve } proxyJob := sub.ToProxyJob(job) ver, err = w.runDDLJob(d, t, &proxyJob) + err = handleRollbackException(err, proxyJob.Error) + if err != nil { + return ver, err + } sub.FromProxyJob(&proxyJob) - return ver, err + return ver, nil } // The last rollback/cancelling sub-job is done. job.State = model.JobStateRollbackDone @@ -154,6 +158,22 @@ func handleRevertibleException(job *model.Job, subJob *model.SubJob, err *terror } } +func handleRollbackException(runJobErr error, proxyJobErr *terror.Error) error { + if runJobErr != nil { + // The physical errors are not recoverable during rolling back. + // We keep retrying it. + return runJobErr + } + if proxyJobErr != nil { + if proxyJobErr.Equal(dbterror.ErrCancelledDDLJob) { + // A cancelled DDL error is normal during rolling back. + return nil + } + return proxyJobErr + } + return nil +} + func appendToSubJobs(m *model.MultiSchemaInfo, job *model.Job) error { err := fillMultiSchemaInfo(m, job) if err != nil { @@ -189,6 +209,20 @@ func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *model.Job) (err error case model.ActionDropIndex, model.ActionDropPrimaryKey: indexName := job.Args[0].(model.CIStr) info.DropIndexes = append(info.DropIndexes, indexName) + case model.ActionAddIndex, model.ActionAddPrimaryKey: + indexName := job.Args[1].(model.CIStr) + indexPartSpecifications := job.Args[2].([]*ast.IndexPartSpecification) + info.AddIndexes = append(info.AddIndexes, indexName) + for _, indexPartSpecification := range indexPartSpecifications { + info.RelativeColumns = append(info.RelativeColumns, indexPartSpecification.Column.Name) + } + if hiddenCols, ok := job.Args[4].([]*model.ColumnInfo); ok { + for _, c := range hiddenCols { + for depColName := range c.Dependences { + info.RelativeColumns = append(info.RelativeColumns, model.NewCIStr(depColName)) + } + } + } default: return dbterror.ErrRunMultiSchemaChanges } diff --git a/ddl/multi_schema_change_test.go b/ddl/multi_schema_change_test.go index 73f286a67efd1..34dac87195a0e 100644 --- a/ddl/multi_schema_change_test.go +++ b/ddl/multi_schema_change_test.go @@ -289,6 +289,84 @@ func TestMultiSchemaChangeAddDropColumns(t *testing.T) { tk.MustGetErrCode("alter table t add column c int default 3 after a, add column d int default 4 first, drop column a, drop column b;", errno.ErrUnsupportedDDLOperation) } +func TestMultiSchemaChangeAddIndexes(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + // Test add multiple indexes with same column. + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a int, b int, c int);") + tk.MustExec("insert into t values (1, 2, 3);") + tk.MustExec("alter table t add index t(a, b), add index t1(a);") + tk.MustExec("alter table t add index t2(a), add index t3(a, b);") + tk.MustQuery("select * from t use index (t, t1, t2, t3);").Check(testkit.Rows("1 2 3")) + tk.MustExec("admin check table t;") + + // Test add multiple indexes with same name. + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int, b int, c int)") + tk.MustGetErrCode("alter table t add index t(a), add index t(b)", errno.ErrUnsupportedDDLOperation) + + // Test add indexes with drop column. + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int, b int, c int)") + tk.MustGetErrCode("alter table t add index t(a), drop column a", errno.ErrUnsupportedDDLOperation) + tk.MustGetErrCode("alter table t add index t(a, b), drop column a", errno.ErrUnsupportedDDLOperation) + + // Test add index failed. + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a int, b int, c int);") + tk.MustExec("insert into t values (1, 1, 1), (2, 2, 2), (3, 3, 1);") + tk.MustGetErrCode("alter table t add unique index i1(a), add unique index i2(a, b), add unique index i3(c);", + errno.ErrDupEntry) + tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ )) + tk.MustExec("alter table t add index i1(a), add index i2(a, b), add index i3(c);") +} + +func TestMultiSchemaChangeAddIndexesCancelled(t *testing.T) { + store, dom, clean := testkit.CreateMockStoreAndDomain(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + originHook := dom.DDL().GetHook() + + // Test cancel successfully. + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a int, b int, c int);") + tk.MustExec("insert into t values (1, 2, 3);") + cancelHook := newCancelJobHook(store, dom, func(job *model.Job) bool { + // Cancel the job when index 't2' is in write-reorg. + return job.MultiSchemaInfo.SubJobs[2].SchemaState == model.StateWriteReorganization + }) + dom.DDL().SetHook(cancelHook) + tk.MustGetErrCode("alter table t "+ + "add index t(a, b), add index t1(a), "+ + "add index t2(a), add index t3(a, b);", errno.ErrCancelledDDLJob) + dom.DDL().SetHook(originHook) + cancelHook.MustCancelDone(t) + tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ )) + tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3")) + tk.MustExec("admin check table t;") + + // Test cancel failed when some sub-jobs have been finished. + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a int, b int, c int);") + tk.MustExec("insert into t values (1, 2, 3);") + cancelHook = newCancelJobHook(store, dom, func(job *model.Job) bool { + // Cancel the job when index 't1' is in public. + return job.MultiSchemaInfo.SubJobs[1].SchemaState == model.StatePublic + }) + dom.DDL().SetHook(cancelHook) + tk.MustExec("alter table t add index t(a, b), add index t1(a), " + + "add index t2(a), add index t3(a, b);") + dom.DDL().SetHook(originHook) + cancelHook.MustCancelFailed(t) + tk.MustQuery("select * from t use index(t, t1, t2, t3);").Check(testkit.Rows("1 2 3")) + tk.MustExec("admin check table t;") +} + func TestMultiSchemaChangeDropIndexes(t *testing.T) { store, clean := testkit.CreateMockStore(t) defer clean() @@ -372,6 +450,33 @@ func TestMultiSchemaChangeDropIndexesParallel(t *testing.T) { }) } +func TestMultiSchemaChangeAddDropIndexes(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + // Test add and drop same index. + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int, b int, c int, index t(a))") + tk.MustGetErrCode("alter table t drop index t, add index t(b)", errno.ErrDupKeyName) + + // Test add and drop same index. + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int, b int, c int, index t(a))") + tk.MustGetErrCode("alter table t add index t1(b), drop index t1", errno.ErrCantDropFieldOrKey) + + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a int, b int, c int, index (a), index(b), index(c));") + tk.MustExec("insert into t values (1, 2, 3);") + tk.MustExec("alter table t add index aa(a), drop index a, add index cc(c), drop index b, drop index c, add index bb(b);") + tk.MustQuery("select * from t use index(aa, bb, cc);").Check(testkit.Rows("1 2 3")) + tk.MustGetErrCode("select * from t use index(a);", errno.ErrKeyDoesNotExist) + tk.MustGetErrCode("select * from t use index(b);", errno.ErrKeyDoesNotExist) + tk.MustGetErrCode("select * from t use index(c);", errno.ErrKeyDoesNotExist) + tk.MustExec("admin check table t;") +} + type cancelOnceHook struct { store kv.Storage triggered bool diff --git a/ddl/rollingback.go b/ddl/rollingback.go index e13a73742bc89..734b1f1549c62 100644 --- a/ddl/rollingback.go +++ b/ddl/rollingback.go @@ -77,9 +77,9 @@ func convertAddIdxJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Job, tblIn return ver, errors.Trace(err) } -// convertNotStartAddIdxJob2RollbackJob converts the add index job that are not started workers to rollingbackJob, +// convertNotReorgAddIdxJob2RollbackJob converts the add index job that are not started workers to rollingbackJob, // to rollback add index operations. job.SnapshotVer == 0 indicates the workers are not started. -func convertNotStartAddIdxJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Job, occuredErr error) (ver int64, err error) { +func convertNotReorgAddIdxJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Job, occuredErr error) (ver int64, err error) { schemaID := job.SchemaID tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID) if err != nil { @@ -228,19 +228,31 @@ func rollingbackDropIndex(t *meta.Meta, job *model.Job) (ver int64, err error) { } func rollingbackAddIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, isPK bool) (ver int64, err error) { - // If the value of SnapshotVer isn't zero, it means the work is backfilling the indexes. - if job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 { + if needNotifyAndStopReorg(job) { // add index workers are started. need to ask them to exit. logutil.Logger(w.logCtx).Info("[ddl] run the cancelling DDL job", zap.String("job", job.String())) d.notifyReorgCancel(job) ver, err = w.onCreateIndex(d, t, job, isPK) } else { - // add index workers are not started, remove the indexInfo in tableInfo. - ver, err = convertNotStartAddIdxJob2RollbackJob(d, t, job, dbterror.ErrCancelledDDLJob) + // add index's reorg workers are not running, remove the indexInfo in tableInfo. + ver, err = convertNotReorgAddIdxJob2RollbackJob(d, t, job, dbterror.ErrCancelledDDLJob) } return } +func needNotifyAndStopReorg(job *model.Job) bool { + if job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 { + // If the value of SnapshotVer isn't zero, it means the work is backfilling the indexes. + if job.IsNonRevertibleSubJob() { + // However, if the sub-job is non-revertible, it means the reorg process is finished. + // We don't need to start another round to notify reorg workers to exit. + return false + } + return true + } + return false +} + func convertAddTablePartitionJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Job, otherwiseErr error, tblInfo *model.TableInfo) (ver int64, err error) { addingDefinitions := tblInfo.Partition.AddingDefinitions partNames := make([]string, 0, len(addingDefinitions)) diff --git a/ddl/sanity_check.go b/ddl/sanity_check.go index 9f0f540b20793..e8b417583c095 100644 --- a/ddl/sanity_check.go +++ b/ddl/sanity_check.go @@ -79,6 +79,10 @@ func queryDeleteRangeCnt(sessPool *sessionPool, jobID int64) (int, error) { } func expectedDeleteRangeCnt(job *model.Job) (int, error) { + if job.State == model.JobStateCancelled { + // Cancelled job should not have any delete range. + return 0, nil + } switch job.Type { case model.ActionDropSchema: var tableIDs []int64 @@ -101,6 +105,10 @@ func expectedDeleteRangeCnt(job *model.Job) (int, error) { } return len(physicalTableIDs), nil case model.ActionAddIndex, model.ActionAddPrimaryKey: + hasDelRange := job.State == model.JobStateRollbackDone + if !hasDelRange { + return 0, nil + } var indexID int64 var ifExists bool var partitionIDs []int64 diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 8d2a80c64ccdf..675d2a20bb144 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -435,6 +435,14 @@ func (job *Job) MarkNonRevertible() { } } +// IsNonRevertibleSubJob returns true if the job is a sub-job, and it is non-revertible. +func (job *Job) IsNonRevertibleSubJob() bool { + if job.MultiSchemaInfo != nil { + return !job.MultiSchemaInfo.Revertible + } + return false +} + // Clone returns a copy of the job. func (job *Job) Clone() *Job { encode, err := job.Encode(true) From 8679027f1f60f092bd6f912ce3080793ff877112 Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 6 Jul 2022 17:58:12 +0800 Subject: [PATCH 2/8] ignore static check --- ddl/ddl_worker.go | 2 ++ ddl/rollingback.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index b0e31586426a6..8e7245491e2de 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -426,6 +426,8 @@ func needToUpdateRawArgs(job *model.Job, meetErr bool) bool { if meetErr && job.RawArgs != nil && job.Args == nil { // If there is an error when running job and the RawArgs hasn't been decoded by DecodeArgs, // so we shouldn't replace RawArgs with the marshaling Args. + //nolint:staticcheck + //lint:ignore S1008 if job.MultiSchemaInfo != nil { // However, for multi-schema change, the args of the parent job is always nil. // Since Job.Encode() can handle the sub-jobs properly, we can safely update the raw args. diff --git a/ddl/rollingback.go b/ddl/rollingback.go index 734b1f1549c62..69f9906ffa7de 100644 --- a/ddl/rollingback.go +++ b/ddl/rollingback.go @@ -243,6 +243,8 @@ func rollingbackAddIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, isP func needNotifyAndStopReorg(job *model.Job) bool { if job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 { // If the value of SnapshotVer isn't zero, it means the work is backfilling the indexes. + //nolint:staticcheck + //lint:ignore S1008 if job.IsNonRevertibleSubJob() { // However, if the sub-job is non-revertible, it means the reorg process is finished. // We don't need to start another round to notify reorg workers to exit. From a307caa137ca71cf8c5b4add0adb1c391688cd11 Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 6 Jul 2022 20:17:01 +0800 Subject: [PATCH 3/8] fix linter --- ddl/ddl_worker.go | 19 +++++++------------ ddl/rollingback.go | 12 +++++------- parser/model/ddl.go | 8 -------- 3 files changed, 12 insertions(+), 27 deletions(-) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 8e7245491e2de..8f4df40abab84 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -414,7 +414,7 @@ func (w *worker) updateDDLJob(t *meta.Meta, job *model.Job, meetErr bool) error failpoint.Return(kv.ErrEntryTooLarge) } }) - updateRawArgs := needToUpdateRawArgs(job, meetErr) + updateRawArgs := needUpdateRawArgs(job, meetErr) if !updateRawArgs { logutil.Logger(w.logCtx).Info("[ddl] meet something wrong before update DDL job, shouldn't update raw args", zap.String("job", job.String())) @@ -422,18 +422,13 @@ func (w *worker) updateDDLJob(t *meta.Meta, job *model.Job, meetErr bool) error return errors.Trace(t.UpdateDDLJob(0, job, updateRawArgs)) } -func needToUpdateRawArgs(job *model.Job, meetErr bool) bool { +func needUpdateRawArgs(job *model.Job, meetErr bool) bool { + // If there is an error when running job and the RawArgs hasn't been decoded by DecodeArgs, + // we shouldn't replace RawArgs with the marshaling Args. if meetErr && job.RawArgs != nil && job.Args == nil { - // If there is an error when running job and the RawArgs hasn't been decoded by DecodeArgs, - // so we shouldn't replace RawArgs with the marshaling Args. - //nolint:staticcheck - //lint:ignore S1008 - if job.MultiSchemaInfo != nil { - // However, for multi-schema change, the args of the parent job is always nil. - // Since Job.Encode() can handle the sub-jobs properly, we can safely update the raw args. - return true - } - return false + // However, for multi-schema change, the args of the parent job is always nil. + // Since Job.Encode() can handle the sub-jobs properly, we can safely update the raw args. + return job.MultiSchemaInfo != nil } return true } diff --git a/ddl/rollingback.go b/ddl/rollingback.go index 69f9906ffa7de..d1a5f49ca843e 100644 --- a/ddl/rollingback.go +++ b/ddl/rollingback.go @@ -228,7 +228,7 @@ func rollingbackDropIndex(t *meta.Meta, job *model.Job) (ver int64, err error) { } func rollingbackAddIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, isPK bool) (ver int64, err error) { - if needNotifyAndStopReorg(job) { + if needNotifyAndStopReorgWorker(job) { // add index workers are started. need to ask them to exit. logutil.Logger(w.logCtx).Info("[ddl] run the cancelling DDL job", zap.String("job", job.String())) d.notifyReorgCancel(job) @@ -240,15 +240,13 @@ func rollingbackAddIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, isP return } -func needNotifyAndStopReorg(job *model.Job) bool { +func needNotifyAndStopReorgWorker(job *model.Job) bool { if job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 { - // If the value of SnapshotVer isn't zero, it means the work is backfilling the indexes. - //nolint:staticcheck - //lint:ignore S1008 - if job.IsNonRevertibleSubJob() { + // If the value of SnapshotVer isn't zero, it means the worker is backfilling the indexes. + if job.MultiSchemaInfo != nil { // However, if the sub-job is non-revertible, it means the reorg process is finished. // We don't need to start another round to notify reorg workers to exit. - return false + return job.MultiSchemaInfo.Revertible } return true } diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 675d2a20bb144..8d2a80c64ccdf 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -435,14 +435,6 @@ func (job *Job) MarkNonRevertible() { } } -// IsNonRevertibleSubJob returns true if the job is a sub-job, and it is non-revertible. -func (job *Job) IsNonRevertibleSubJob() bool { - if job.MultiSchemaInfo != nil { - return !job.MultiSchemaInfo.Revertible - } - return false -} - // Clone returns a copy of the job. func (job *Job) Clone() *Job { encode, err := job.Encode(true) From 47ab5e7a731c5fd45b112173055240d9cb6981e0 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 7 Jul 2022 12:06:21 +0800 Subject: [PATCH 4/8] fix typo --- ddl/index.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index 797195b4cb66e..3bb702d250ee2 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -490,10 +490,8 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo if indexInfo == nil { if len(hiddenCols) > 0 { - if len(hiddenCols) > 0 { - for _, hiddenCol := range hiddenCols { - initAndAddColumnToTable(tblInfo, hiddenCol) - } + for _, hiddenCol := range hiddenCols { + initAndAddColumnToTable(tblInfo, hiddenCol) } } if err = checkAddColumnTooManyColumns(len(tblInfo.Columns)); err != nil { From b516e0a359a74ff5de5ea496d2073d3285db377e Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 7 Jul 2022 15:09:27 +0800 Subject: [PATCH 5/8] Update ddl/index.go Co-authored-by: Lynn --- ddl/index.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index 3bb702d250ee2..047191b03223e 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -625,9 +625,9 @@ func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, jo done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo) if done { job.MarkNonRevertible() - done = false // We need another round to wait for all the others sub-jobs to finish. } - return done, ver, err + // We need another round to wait for all the others sub-jobs to finish. + return false, ver, err } return true, ver, err } From 6f1a909478df79d69c8ce40048acda0cd79ab70c Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 7 Jul 2022 16:20:05 +0800 Subject: [PATCH 6/8] address comment --- ddl/multi_schema_change_test.go | 43 +++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/ddl/multi_schema_change_test.go b/ddl/multi_schema_change_test.go index 34dac87195a0e..27be0874514a8 100644 --- a/ddl/multi_schema_change_test.go +++ b/ddl/multi_schema_change_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/testkit" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -308,12 +309,14 @@ func TestMultiSchemaChangeAddIndexes(t *testing.T) { tk.MustExec("drop table if exists t") tk.MustExec("create table t (a int, b int, c int)") tk.MustGetErrCode("alter table t add index t(a), add index t(b)", errno.ErrUnsupportedDDLOperation) + tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ )) // Test add indexes with drop column. tk.MustExec("drop table if exists t") tk.MustExec("create table t (a int, b int, c int)") tk.MustGetErrCode("alter table t add index t(a), drop column a", errno.ErrUnsupportedDDLOperation) tk.MustGetErrCode("alter table t add index t(a, b), drop column a", errno.ErrUnsupportedDDLOperation) + tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ )) // Test add index failed. tk.MustExec("drop table if exists t;") @@ -477,6 +480,46 @@ func TestMultiSchemaChangeAddDropIndexes(t *testing.T) { tk.MustExec("admin check table t;") } +func TestMultiSchemaChangeWithExpressionIndex(t *testing.T) { + store, dom, clean := testkit.CreateMockStoreAndDomain(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("create table t (a int, b int);") + tk.MustExec("insert into t values (1, 2), (2, 1);") + tk.MustGetErrCode("alter table t drop column a, add unique index idx((a + b));", errno.ErrUnsupportedDDLOperation) + tk.MustGetErrCode("alter table t add column c int, change column a d bigint, add index idx((a + a));", errno.ErrUnsupportedDDLOperation) + tk.MustGetErrCode("alter table t add column c int default 10, add index idx1((a + b)), add unique index idx2((a + b));", + errno.ErrDupEntry) + tk.MustQuery("select * from t;").Check(testkit.Rows("1 2", "2 1")) + + originHook := dom.DDL().GetHook() + hook := &ddl.TestDDLCallback{Do: dom} + var checkErr error + hook.OnJobRunBeforeExported = func(job *model.Job) { + assert.Equal(t, model.ActionMultiSchemaChange, job.Type) + if job.MultiSchemaInfo.SubJobs[1].SchemaState == model.StateWriteOnly { + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test;") + _, checkErr = tk2.Exec("update t set a = 3 where a = 1;") + if checkErr != nil { + return + } + _, checkErr = tk2.Exec("insert into t values (10, 10);") + } + } + dom.DDL().SetHook(hook) + tk.MustExec("alter table t add column c int default 10, add index idx1((a + b)), add unique index idx2((a + b));") + require.NoError(t, checkErr) + dom.DDL().SetHook(originHook) + + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a int, b int);") + tk.MustExec("insert into t values (1, 2), (2, 1);") + tk.MustExec("alter table t add column c int default 10, add index idx1((a + b)), add unique index idx2((a*10 + b));") + tk.MustQuery("select * from t use index(idx1, idx2);").Check(testkit.Rows("1 2 10", "2 1 10")) +} + type cancelOnceHook struct { store kv.Storage triggered bool From c0d5e476975fdcd6dd758a03f54f2e7083529455 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 7 Jul 2022 16:38:33 +0800 Subject: [PATCH 7/8] skip the hook if checkErr is not nil --- ddl/multi_schema_change_test.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/ddl/multi_schema_change_test.go b/ddl/multi_schema_change_test.go index 27be0874514a8..2c0f45d4543dd 100644 --- a/ddl/multi_schema_change_test.go +++ b/ddl/multi_schema_change_test.go @@ -309,14 +309,14 @@ func TestMultiSchemaChangeAddIndexes(t *testing.T) { tk.MustExec("drop table if exists t") tk.MustExec("create table t (a int, b int, c int)") tk.MustGetErrCode("alter table t add index t(a), add index t(b)", errno.ErrUnsupportedDDLOperation) - tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ )) + tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */)) // Test add indexes with drop column. tk.MustExec("drop table if exists t") tk.MustExec("create table t (a int, b int, c int)") tk.MustGetErrCode("alter table t add index t(a), drop column a", errno.ErrUnsupportedDDLOperation) tk.MustGetErrCode("alter table t add index t(a, b), drop column a", errno.ErrUnsupportedDDLOperation) - tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ )) + tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */)) // Test add index failed. tk.MustExec("drop table if exists t;") @@ -324,7 +324,7 @@ func TestMultiSchemaChangeAddIndexes(t *testing.T) { tk.MustExec("insert into t values (1, 1, 1), (2, 2, 2), (3, 3, 1);") tk.MustGetErrCode("alter table t add unique index i1(a), add unique index i2(a, b), add unique index i3(c);", errno.ErrDupEntry) - tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ )) + tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */)) tk.MustExec("alter table t add index i1(a), add index i2(a, b), add index i3(c);") } @@ -349,7 +349,7 @@ func TestMultiSchemaChangeAddIndexesCancelled(t *testing.T) { "add index t2(a), add index t3(a, b);", errno.ErrCancelledDDLJob) dom.DDL().SetHook(originHook) cancelHook.MustCancelDone(t) - tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ )) + tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */)) tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3")) tk.MustExec("admin check table t;") @@ -497,6 +497,9 @@ func TestMultiSchemaChangeWithExpressionIndex(t *testing.T) { hook := &ddl.TestDDLCallback{Do: dom} var checkErr error hook.OnJobRunBeforeExported = func(job *model.Job) { + if checkErr != nil { + return + } assert.Equal(t, model.ActionMultiSchemaChange, job.Type) if job.MultiSchemaInfo.SubJobs[1].SchemaState == model.StateWriteOnly { tk2 := testkit.NewTestKit(t, store) From 921005aced3eea1f6d8da59c9973c99e08f5e986 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 7 Jul 2022 16:57:47 +0800 Subject: [PATCH 8/8] make fmt --- ddl/multi_schema_change_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ddl/multi_schema_change_test.go b/ddl/multi_schema_change_test.go index 2c0f45d4543dd..f07e8340b2659 100644 --- a/ddl/multi_schema_change_test.go +++ b/ddl/multi_schema_change_test.go @@ -309,14 +309,14 @@ func TestMultiSchemaChangeAddIndexes(t *testing.T) { tk.MustExec("drop table if exists t") tk.MustExec("create table t (a int, b int, c int)") tk.MustGetErrCode("alter table t add index t(a), add index t(b)", errno.ErrUnsupportedDDLOperation) - tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */)) + tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ )) // Test add indexes with drop column. tk.MustExec("drop table if exists t") tk.MustExec("create table t (a int, b int, c int)") tk.MustGetErrCode("alter table t add index t(a), drop column a", errno.ErrUnsupportedDDLOperation) tk.MustGetErrCode("alter table t add index t(a, b), drop column a", errno.ErrUnsupportedDDLOperation) - tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */)) + tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ )) // Test add index failed. tk.MustExec("drop table if exists t;") @@ -324,7 +324,7 @@ func TestMultiSchemaChangeAddIndexes(t *testing.T) { tk.MustExec("insert into t values (1, 1, 1), (2, 2, 2), (3, 3, 1);") tk.MustGetErrCode("alter table t add unique index i1(a), add unique index i2(a, b), add unique index i3(c);", errno.ErrDupEntry) - tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */)) + tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ )) tk.MustExec("alter table t add index i1(a), add index i2(a, b), add index i3(c);") } @@ -349,7 +349,7 @@ func TestMultiSchemaChangeAddIndexesCancelled(t *testing.T) { "add index t2(a), add index t3(a, b);", errno.ErrCancelledDDLJob) dom.DDL().SetHook(originHook) cancelHook.MustCancelDone(t) - tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */)) + tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ )) tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3")) tk.MustExec("admin check table t;")