Skip to content

Commit

Permalink
ddl: fix the integration tests (pingcap#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Mar 16, 2022
1 parent 67eeacc commit d4b1f7d
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 37 deletions.
1 change: 1 addition & 0 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ func onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
}
if job.MultiSchemaInfo != nil && job.MultiSchemaInfo.Revertible {
job.MarkNonRevertible()
// Store the mark and enter the next DDL handling loop.
return updateVersionAndTableInfoWithCheck(t, job, tblInfo, false)
}

Expand Down
7 changes: 3 additions & 4 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func TestIssue5092(t *testing.T) {
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
// The following two statements are consistent with MariaDB.
tk.MustGetErrCode("alter table t_issue_5092 add column if not exists d int, add column d int", errno.ErrDupFieldName)
tk.MustExec("alter table t_issue_5092 add column dd int, add column if not exists dd int")
tk.MustGetErrCode("alter table t_issue_5092 add column dd int, add column if not exists dd int", errno.ErrUnsupportedDDLOperation)
tk.MustExec("alter table t_issue_5092 add column if not exists (d int, e int), add column ff text")
tk.MustExec("alter table t_issue_5092 add column b2 int after b1, add column c2 int first")
tk.MustQuery("show create table t_issue_5092").Check(testkit.Rows("t_issue_5092 CREATE TABLE `t_issue_5092` (\n" +
Expand All @@ -420,7 +420,6 @@ func TestIssue5092(t *testing.T) {
" `c1` int(11) DEFAULT NULL,\n" +
" `f` int(11) DEFAULT NULL,\n" +
" `g` int(11) DEFAULT NULL,\n" +
" `dd` int(11) DEFAULT NULL,\n" +
" `ff` text DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
tk.MustExec("drop table t_issue_5092")
Expand Down Expand Up @@ -454,8 +453,8 @@ func TestIssue5092(t *testing.T) {
tk.MustExec("create table t_issue_5092 (a int)")
tk.MustExec("alter table t_issue_5092 add column (b int, c int)")
tk.MustGetErrCode("alter table t_issue_5092 drop column if exists a, drop column b, drop column c", errno.ErrCantRemoveAllFields)
tk.MustGetErrCode("alter table t_issue_5092 drop column if exists c, drop column c", errno.ErrCantDropFieldOrKey)
tk.MustExec("alter table t_issue_5092 drop column c, drop column if exists c")
tk.MustGetErrCode("alter table t_issue_5092 drop column if exists c, drop column c", errno.ErrUnsupportedDDLOperation)
tk.MustGetErrCode("alter table t_issue_5092 drop column c, drop column if exists c", errno.ErrUnsupportedDDLOperation)
tk.MustExec("drop table t_issue_5092")
}

Expand Down
6 changes: 2 additions & 4 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"github.com/pingcap/tidb/table"
goutil "github.com/pingcap/tidb/util"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -714,15 +713,14 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
}
// Only for JobStateCancelled job which is adding columns or drop columns or drop indexes.
if historyJob.IsCancelled() && (historyJob.Type == model.ActionAddColumns || historyJob.Type == model.ActionDropColumns ||
historyJob.Type == model.ActionDropIndexes ||
historyJob.Type == model.ActionMultiSchemaChange) {
historyJob.Type == model.ActionDropIndexes) {
if historyJob.MultiSchemaInfo != nil && len(historyJob.MultiSchemaInfo.Warnings) != 0 {
for _, warning := range historyJob.MultiSchemaInfo.Warnings {
ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
}
}
logutil.BgLogger().Info("[ddl] DDL job is cancelled", zap.Int64("jobID", jobID))
return dbterror.ErrCancelledDDLJob
return nil
}
panic("When the state is JobStateRollbackDone or JobStateCancelled, historyJob.Error should never be nil")
}
Expand Down
12 changes: 8 additions & 4 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4069,14 +4069,18 @@ func checkIsDroppableColumn(ctx sessionctx.Context, t table.Table, spec *ast.Alt

func checkVisibleColumnCnt(t table.Table, addCnt, dropCnt int) error {
tblInfo := t.Meta()
visibleColumCnt := addCnt
visibleColumCnt := 0
for _, column := range tblInfo.Columns {
if !column.Hidden {
visibleColumCnt++
}
if visibleColumCnt > dropCnt {
return nil
}
}
if visibleColumCnt+addCnt > dropCnt {
return nil
}
if len(tblInfo.Columns)-visibleColumCnt > 0 {
// There are only invisible columns.
return dbterror.ErrTableMustHaveColumns
}
return dbterror.ErrCantRemoveAllFields
}
Expand Down
4 changes: 1 addition & 3 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,9 +785,7 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
}
// The cause of this job state is that the job is cancelled by client.
if job.IsCancelling() {
if job.Type != model.ActionMultiSchemaChange {
return convertJob2RollbackJob(w, d, t, job)
}
return convertJob2RollbackJob(w, d, t, job)
}

if !job.IsRollingback() && !job.IsCancelling() {
Expand Down
17 changes: 10 additions & 7 deletions ddl/index_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/ddl"
testddlutil "github.com/pingcap/tidb/ddl/testutil"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
Expand Down Expand Up @@ -1098,19 +1099,21 @@ func testDropIndexesIfExists(t *testing.T, store kv.Storage) {
"[ddl:1091]index i3 doesn't exist",
)
tk.MustExec("alter table test_drop_indexes_if_exists drop index i1, drop index if exists i3;")
tk.MustQuery("show warnings;").Check(testkit.RowsWithSep("|", "Warning|1091|index i3 doesn't exist"))
tk.MustQuery("show warnings;").Check(testkit.RowsWithSep("|", "Note|1091|index i3 doesn't exist"))

// Verify the impact of deletion order when dropping duplicate indexes.
tk.MustGetErrMsg(
tk.MustGetErrCode(
"alter table test_drop_indexes_if_exists drop index i2, drop index i2;",
"[ddl:1091]index i2 doesn't exist",
errno.ErrUnsupportedDDLOperation,
)
tk.MustGetErrMsg(
tk.MustGetErrCode(
"alter table test_drop_indexes_if_exists drop index if exists i2, drop index i2;",
"[ddl:1091]index i2 doesn't exist",
errno.ErrUnsupportedDDLOperation,
)
tk.MustGetErrCode(
"alter table test_drop_indexes_if_exists drop index i2, drop index if exists i2;",
errno.ErrUnsupportedDDLOperation,
)
tk.MustExec("alter table test_drop_indexes_if_exists drop index i2, drop index if exists i2;")
tk.MustQuery("show warnings;").Check(testkit.RowsWithSep("|", "Warning|1091|index i2 doesn't exist"))
}

func testDropIndexesFromPartitionedTable(t *testing.T, store kv.Storage) {
Expand Down
35 changes: 21 additions & 14 deletions ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ import (
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/dbterror"
)

func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
if job.MultiSchemaInfo.Revertible {
// Handle the rolling back job.
if job.IsRollingback() || job.IsCancelling() {
if job.IsRollingback() {
// Rollback/cancel the sub-jobs in reverse order.
for i := len(job.MultiSchemaInfo.SubJobs) - 1; i >= 0; i-- {
sub := job.MultiSchemaInfo.SubJobs[i]
Expand All @@ -37,17 +38,14 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
proxyJob := cloneFromSubJob(job, sub)
ver, err = w.runDDLJob(d, t, proxyJob)
mergeBackToSubJob(proxyJob, sub)
if i == 0 {
// The last rollback/cancelling sub-job is done.
if job.IsRollingback() {
job.State = model.JobStateRollbackDone
}
if job.IsCancelling() {
job.State = model.JobStateCancelled
}
if i == 0 && isFinished(sub) {
job.State = model.JobStateRollbackDone
}
return ver, err
}
// The last rollback/cancelling sub-job is done.
job.State = model.JobStateRollbackDone
return ver, nil
}

// The sub-jobs are normally running.
Expand All @@ -60,8 +58,8 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
}
proxyJob := cloneFromSubJob(job, sub)
ver, err = w.runDDLJob(d, t, proxyJob)
handleRevertibleException(job, sub.State, i)
mergeBackToSubJob(proxyJob, sub)
handleRevertibleException(job, sub, i, proxyJob.Error)
return ver, err
}
// All the sub-jobs are non-revertible.
Expand Down Expand Up @@ -101,7 +99,7 @@ func cloneFromSubJob(job *model.Job, sub *model.SubJob) *model.Job {
SchemaID: job.SchemaID,
TableID: job.TableID,
SchemaName: job.SchemaName,
State: job.State,
State: sub.State,
Error: nil,
ErrorCount: 0,
RowCount: 0,
Expand Down Expand Up @@ -132,10 +130,12 @@ func mergeBackToSubJob(job *model.Job, sub *model.SubJob) {
sub.State = job.State
}

func handleRevertibleException(job *model.Job, res model.JobState, idx int) {
if res == model.JobStateRollingback || res == model.JobStateCancelling {
job.State = res
func handleRevertibleException(job *model.Job, subJob *model.SubJob, idx int, err *terror.Error) {
if !isAbnormal(subJob) {
return
}
job.State = model.JobStateRollingback
job.Error = err
// Flush the cancelling state and cancelled state to sub-jobs.
for i, sub := range job.MultiSchemaInfo.SubJobs {
if i < idx {
Expand All @@ -147,6 +147,13 @@ func handleRevertibleException(job *model.Job, res model.JobState, idx int) {
}
}

func isAbnormal(job *model.SubJob) bool {
return job.State == model.JobStateCancelling ||
job.State == model.JobStateCancelled ||
job.State == model.JobStateRollingback ||
job.State == model.JobStateRollbackDone
}

func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *model.Job) (err error) {
switch job.Type {
case model.ActionAddColumn:
Expand Down
5 changes: 5 additions & 0 deletions ddl/multi_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ func TestMultiSchemaChangeAddColumns(t *testing.T) {
tk.MustExec("insert into t values (1);")
tk.MustExec("alter table t add column (b int default 2, c int default 3);")
tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3"))
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int);")
tk.MustExec("insert into t values (1, 2, 3);")
tk.MustExec("alter table t add column (d int default 4, e int default 5);")
tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3 4 5"))

// Test referencing previous column in multi-schema change is not supported.
tk.MustExec("drop table if exists t;")
Expand Down
22 changes: 22 additions & 0 deletions ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,26 @@ func rollingbackTruncateTable(t *meta.Meta, job *model.Job) (ver int64, err erro
return cancelOnlyNotHandledJob(job)
}

func rollingBackMultiSchemaChange(job *model.Job) error {
if !job.MultiSchemaInfo.Revertible {
// Cannot rolling back because the jobs are non-revertible.
// Resume the job state to running.
job.State = model.JobStateRunning
return nil
}
// Mark all the jobs to cancelling.
for _, sub := range job.MultiSchemaInfo.SubJobs {
switch sub.State {
case model.JobStateRunning:
sub.State = model.JobStateCancelling
case model.JobStateNone:
sub.State = model.JobStateCancelled
}
}
job.State = model.JobStateRollingback
return dbterror.ErrCancelledDDLJob
}

func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
switch job.Type {
case model.ActionAddColumn:
Expand Down Expand Up @@ -479,6 +499,8 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job)
model.ActionModifyTableAutoIdCache, model.ActionAlterIndexVisibility,
model.ActionExchangeTablePartition, model.ActionModifySchemaDefaultPlacement:
ver, err = cancelOnlyNotHandledJob(job)
case model.ActionMultiSchemaChange:
err = rollingBackMultiSchemaChange(job)
default:
job.State = model.JobStateCancelled
err = dbterror.ErrCancelledDDLJob
Expand Down
2 changes: 1 addition & 1 deletion executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,9 +875,9 @@ func TestAsOfTimestampCompatibility(t *testing.T) {
tk.MustExec("commit")
}
tk.MustExec(`create table test.table1 (id int primary key, a int);`)
defer tk.MustExec("drop table if exists test.table1;")
time1 = time.Now()
tk.MustExec(fmt.Sprintf("explain analyze select * from test.table1 as of timestamp '%s' where id = 1;", time1.Format("2006-1-2 15:04:05.000")))
tk.MustExec("drop table if exists test.table1;")
}

func TestSetTransactionInfoSchema(t *testing.T) {
Expand Down

0 comments on commit d4b1f7d

Please sign in to comment.