Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix the integration tests #27

Merged
merged 9 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ func onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
}
if job.MultiSchemaInfo != nil && job.MultiSchemaInfo.Revertible {
job.MarkNonRevertible()
// Store the mark and enter the next DDL handling loop.
return updateVersionAndTableInfoWithCheck(t, job, tblInfo, false)
}

Expand Down
7 changes: 3 additions & 4 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func TestIssue5092(t *testing.T) {
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
// The following two statements are consistent with MariaDB.
tk.MustGetErrCode("alter table t_issue_5092 add column if not exists d int, add column d int", errno.ErrDupFieldName)
tk.MustExec("alter table t_issue_5092 add column dd int, add column if not exists dd int")
tk.MustGetErrCode("alter table t_issue_5092 add column dd int, add column if not exists dd int", errno.ErrUnsupportedDDLOperation)
tk.MustExec("alter table t_issue_5092 add column if not exists (d int, e int), add column ff text")
tk.MustExec("alter table t_issue_5092 add column b2 int after b1, add column c2 int first")
tk.MustQuery("show create table t_issue_5092").Check(testkit.Rows("t_issue_5092 CREATE TABLE `t_issue_5092` (\n" +
Expand All @@ -420,7 +420,6 @@ func TestIssue5092(t *testing.T) {
" `c1` int(11) DEFAULT NULL,\n" +
" `f` int(11) DEFAULT NULL,\n" +
" `g` int(11) DEFAULT NULL,\n" +
" `dd` int(11) DEFAULT NULL,\n" +
" `ff` text DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
tk.MustExec("drop table t_issue_5092")
Expand Down Expand Up @@ -454,8 +453,8 @@ func TestIssue5092(t *testing.T) {
tk.MustExec("create table t_issue_5092 (a int)")
tk.MustExec("alter table t_issue_5092 add column (b int, c int)")
tk.MustGetErrCode("alter table t_issue_5092 drop column if exists a, drop column b, drop column c", errno.ErrCantRemoveAllFields)
tk.MustGetErrCode("alter table t_issue_5092 drop column if exists c, drop column c", errno.ErrCantDropFieldOrKey)
tk.MustExec("alter table t_issue_5092 drop column c, drop column if exists c")
tk.MustGetErrCode("alter table t_issue_5092 drop column if exists c, drop column c", errno.ErrUnsupportedDDLOperation)
tk.MustGetErrCode("alter table t_issue_5092 drop column c, drop column if exists c", errno.ErrUnsupportedDDLOperation)
tk.MustExec("drop table t_issue_5092")
}

Expand Down
6 changes: 2 additions & 4 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"github.com/pingcap/tidb/table"
goutil "github.com/pingcap/tidb/util"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -714,15 +713,14 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
}
// Only for JobStateCancelled job which is adding columns or drop columns or drop indexes.
if historyJob.IsCancelled() && (historyJob.Type == model.ActionAddColumns || historyJob.Type == model.ActionDropColumns ||
historyJob.Type == model.ActionDropIndexes ||
historyJob.Type == model.ActionMultiSchemaChange) {
historyJob.Type == model.ActionDropIndexes) {
if historyJob.MultiSchemaInfo != nil && len(historyJob.MultiSchemaInfo.Warnings) != 0 {
for _, warning := range historyJob.MultiSchemaInfo.Warnings {
ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
}
}
logutil.BgLogger().Info("[ddl] DDL job is cancelled", zap.Int64("jobID", jobID))
return dbterror.ErrCancelledDDLJob
return nil
}
panic("When the state is JobStateRollbackDone or JobStateCancelled, historyJob.Error should never be nil")
}
Expand Down
12 changes: 8 additions & 4 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4069,14 +4069,18 @@ func checkIsDroppableColumn(ctx sessionctx.Context, t table.Table, spec *ast.Alt

func checkVisibleColumnCnt(t table.Table, addCnt, dropCnt int) error {
tblInfo := t.Meta()
visibleColumCnt := addCnt
visibleColumCnt := 0
for _, column := range tblInfo.Columns {
if !column.Hidden {
visibleColumCnt++
}
if visibleColumCnt > dropCnt {
return nil
}
}
if visibleColumCnt+addCnt > dropCnt {
return nil
}
if len(tblInfo.Columns)-visibleColumCnt > 0 {
Defined2014 marked this conversation as resolved.
Show resolved Hide resolved
// There are only invisible columns.
return dbterror.ErrTableMustHaveColumns
}
return dbterror.ErrCantRemoveAllFields
}
Expand Down
4 changes: 1 addition & 3 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,9 +785,7 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
}
// The cause of this job state is that the job is cancelled by client.
if job.IsCancelling() {
if job.Type != model.ActionMultiSchemaChange {
return convertJob2RollbackJob(w, d, t, job)
}
return convertJob2RollbackJob(w, d, t, job)
}

if !job.IsRollingback() && !job.IsCancelling() {
Expand Down
17 changes: 10 additions & 7 deletions ddl/index_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/ddl"
testddlutil "github.com/pingcap/tidb/ddl/testutil"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
Expand Down Expand Up @@ -1098,19 +1099,21 @@ func testDropIndexesIfExists(t *testing.T, store kv.Storage) {
"[ddl:1091]index i3 doesn't exist",
)
tk.MustExec("alter table test_drop_indexes_if_exists drop index i1, drop index if exists i3;")
tk.MustQuery("show warnings;").Check(testkit.RowsWithSep("|", "Warning|1091|index i3 doesn't exist"))
tk.MustQuery("show warnings;").Check(testkit.RowsWithSep("|", "Note|1091|index i3 doesn't exist"))

// Verify the impact of deletion order when dropping duplicate indexes.
tk.MustGetErrMsg(
tk.MustGetErrCode(
"alter table test_drop_indexes_if_exists drop index i2, drop index i2;",
"[ddl:1091]index i2 doesn't exist",
errno.ErrUnsupportedDDLOperation,
)
tk.MustGetErrMsg(
tk.MustGetErrCode(
"alter table test_drop_indexes_if_exists drop index if exists i2, drop index i2;",
"[ddl:1091]index i2 doesn't exist",
errno.ErrUnsupportedDDLOperation,
)
tk.MustGetErrCode(
"alter table test_drop_indexes_if_exists drop index i2, drop index if exists i2;",
errno.ErrUnsupportedDDLOperation,
)
tk.MustExec("alter table test_drop_indexes_if_exists drop index i2, drop index if exists i2;")
tk.MustQuery("show warnings;").Check(testkit.RowsWithSep("|", "Warning|1091|index i2 doesn't exist"))
}

func testDropIndexesFromPartitionedTable(t *testing.T, store kv.Storage) {
Expand Down
35 changes: 21 additions & 14 deletions ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ import (
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/dbterror"
)

func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
if job.MultiSchemaInfo.Revertible {
// Handle the rolling back job.
if job.IsRollingback() || job.IsCancelling() {
if job.IsRollingback() {
// Rollback/cancel the sub-jobs in reverse order.
for i := len(job.MultiSchemaInfo.SubJobs) - 1; i >= 0; i-- {
sub := job.MultiSchemaInfo.SubJobs[i]
Expand All @@ -37,17 +38,14 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
proxyJob := cloneFromSubJob(job, sub)
ver, err = w.runDDLJob(d, t, proxyJob)
mergeBackToSubJob(proxyJob, sub)
if i == 0 {
// The last rollback/cancelling sub-job is done.
if job.IsRollingback() {
job.State = model.JobStateRollbackDone
}
if job.IsCancelling() {
job.State = model.JobStateCancelled
}
if i == 0 && isFinished(sub) {
job.State = model.JobStateRollbackDone
}
return ver, err
}
// The last rollback/cancelling sub-job is done.
job.State = model.JobStateRollbackDone
return ver, nil
}

// The sub-jobs are normally running.
Expand All @@ -60,8 +58,8 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
}
proxyJob := cloneFromSubJob(job, sub)
ver, err = w.runDDLJob(d, t, proxyJob)
handleRevertibleException(job, sub.State, i)
mergeBackToSubJob(proxyJob, sub)
handleRevertibleException(job, sub, i, proxyJob.Error)
return ver, err
}
// All the sub-jobs are non-revertible.
Expand Down Expand Up @@ -101,7 +99,7 @@ func cloneFromSubJob(job *model.Job, sub *model.SubJob) *model.Job {
SchemaID: job.SchemaID,
TableID: job.TableID,
SchemaName: job.SchemaName,
State: job.State,
State: sub.State,
Error: nil,
ErrorCount: 0,
RowCount: 0,
Expand Down Expand Up @@ -132,10 +130,12 @@ func mergeBackToSubJob(job *model.Job, sub *model.SubJob) {
sub.State = job.State
}

func handleRevertibleException(job *model.Job, res model.JobState, idx int) {
if res == model.JobStateRollingback || res == model.JobStateCancelling {
job.State = res
func handleRevertibleException(job *model.Job, subJob *model.SubJob, idx int, err *terror.Error) {
if !isAbnormal(subJob) {
return
}
job.State = model.JobStateRollingback
job.Error = err
// Flush the cancelling state and cancelled state to sub-jobs.
for i, sub := range job.MultiSchemaInfo.SubJobs {
if i < idx {
Expand All @@ -147,6 +147,13 @@ func handleRevertibleException(job *model.Job, res model.JobState, idx int) {
}
}

func isAbnormal(job *model.SubJob) bool {
return job.State == model.JobStateCancelling ||
job.State == model.JobStateCancelled ||
job.State == model.JobStateRollingback ||
job.State == model.JobStateRollbackDone
}

func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *model.Job) (err error) {
switch job.Type {
case model.ActionAddColumn:
Expand Down
5 changes: 5 additions & 0 deletions ddl/multi_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ func TestMultiSchemaChangeAddColumns(t *testing.T) {
tk.MustExec("insert into t values (1);")
tk.MustExec("alter table t add column (b int default 2, c int default 3);")
tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3"))
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int);")
tk.MustExec("insert into t values (1, 2, 3);")
tk.MustExec("alter table t add column (d int default 4, e int default 5);")
tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3 4 5"))

// Test referencing previous column in multi-schema change is not supported.
tk.MustExec("drop table if exists t;")
Expand Down
22 changes: 22 additions & 0 deletions ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,26 @@ func rollingbackTruncateTable(t *meta.Meta, job *model.Job) (ver int64, err erro
return cancelOnlyNotHandledJob(job)
}

func rollingBackMultiSchemaChange(job *model.Job) error {
if !job.MultiSchemaInfo.Revertible {
// Cannot rolling back because the jobs are non-revertible.
// Resume the job state to running.
job.State = model.JobStateRunning
return nil
}
// Mark all the jobs to cancelling.
for _, sub := range job.MultiSchemaInfo.SubJobs {
switch sub.State {
case model.JobStateRunning:
sub.State = model.JobStateCancelling
case model.JobStateNone:
sub.State = model.JobStateCancelled
}
}
job.State = model.JobStateRollingback
return dbterror.ErrCancelledDDLJob
}

func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
switch job.Type {
case model.ActionAddColumn:
Expand Down Expand Up @@ -479,6 +499,8 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job)
model.ActionModifyTableAutoIdCache, model.ActionAlterIndexVisibility,
model.ActionExchangeTablePartition, model.ActionModifySchemaDefaultPlacement:
ver, err = cancelOnlyNotHandledJob(job)
case model.ActionMultiSchemaChange:
err = rollingBackMultiSchemaChange(job)
default:
job.State = model.JobStateCancelled
err = dbterror.ErrCancelledDDLJob
Expand Down
2 changes: 1 addition & 1 deletion executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,9 +875,9 @@ func TestAsOfTimestampCompatibility(t *testing.T) {
tk.MustExec("commit")
}
tk.MustExec(`create table test.table1 (id int primary key, a int);`)
defer tk.MustExec("drop table if exists test.table1;")
time1 = time.Now()
tk.MustExec(fmt.Sprintf("explain analyze select * from test.table1 as of timestamp '%s' where id = 1;", time1.Format("2006-1-2 15:04:05.000")))
tk.MustExec("drop table if exists test.table1;")
}

func TestSetTransactionInfoSchema(t *testing.T) {
Expand Down