From 9e2a4d50ecaeb5b1ce8c15ec9a4d9981d7d23fd3 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 11 Apr 2022 11:53:38 +0800 Subject: [PATCH 1/2] ddl: fix inappropriate schema state output by admin show ddl jobs --- ddl/column_type_change_test.go | 2 +- ddl/ddl_worker.go | 5 +++++ parser/model/ddl.go | 12 ++++++++++++ parser/model/model.go | 2 +- parser/model/model_test.go | 2 +- util/admin/admin_test.go | 2 +- 6 files changed, 21 insertions(+), 4 deletions(-) diff --git a/ddl/column_type_change_test.go b/ddl/column_type_change_test.go index 003bcc59b995c..4d29f0ad18b8b 100644 --- a/ddl/column_type_change_test.go +++ b/ddl/column_type_change_test.go @@ -199,7 +199,7 @@ func TestRollbackColumnTypeChangeBetweenInteger(t *testing.T) { // Alter sql will modify column c2 to bigint not null. SQL := "alter table t modify column c2 int not null" err := tk.ExecToErr(SQL) - require.EqualError(t, err, "[ddl:1]MockRollingBackInCallBack-queueing") + require.EqualError(t, err, "[ddl:1]MockRollingBackInCallBack-none") assertRollBackedColUnchanged(t, tk) // Mock roll back at model.StateDeleteOnly. diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 4ec29dcc50b3d..ab2234528e9ab 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -301,6 +301,7 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) { job.Version = currentVersion job.StartTS = txn.StartTS() job.ID = ids[i] + job.State = model.JobStateQueueing if err = buildJobDependence(t, job); err != nil { return errors.Trace(err) } @@ -623,6 +624,10 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error { return errors.Trace(err) } + if job.IsQueueing() { + job.State = model.JobStateNone + } + d.mu.RLock() d.mu.hook.OnJobRunBefore(job) d.mu.RUnlock() diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 5ea3790143baf..ead6eedeeddf1 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -504,6 +504,14 @@ func (job *Job) IsRunning() bool { return job.State == JobStateRunning } +func (job *Job) IsQueueing() bool { + return job.State == JobStateQueueing +} + +func (job *Job) NotStarted() bool { + return job.State == JobStateNone || job.State == JobStateQueueing +} + // JobState is for job state. type JobState byte @@ -523,6 +531,8 @@ const ( JobStateSynced JobState = 6 // JobStateCancelling is used to mark the DDL job is cancelled by the client, but the DDL work hasn't handle it. JobStateCancelling JobState = 7 + // JobStateQueueing means the job has not yet been started. + JobStateQueueing JobState = 8 ) // String implements fmt.Stringer interface. @@ -542,6 +552,8 @@ func (s JobState) String() string { return "cancelling" case JobStateSynced: return "synced" + case JobStateQueueing: + return "queueing" default: return "none" } diff --git a/parser/model/model.go b/parser/model/model.go index 64d9cc98bc9ec..fb50c87e8f02f 100644 --- a/parser/model/model.go +++ b/parser/model/model.go @@ -71,7 +71,7 @@ func (s SchemaState) String() string { case StateGlobalTxnOnly: return "global txn only" default: - return "queueing" + return "none" } } diff --git a/parser/model/model_test.go b/parser/model/model_test.go index 7e594e245a5dc..0ae06feeebb78 100644 --- a/parser/model/model_test.go +++ b/parser/model/model_test.go @@ -222,7 +222,7 @@ func TestJobStartTime(t *testing.T) { BinlogInfo: &HistoryInfo{}, } require.Equal(t, TSConvert2Time(job.StartTS), time.Unix(0, 0)) - require.Equal(t, fmt.Sprintf("ID:123, Type:none, State:none, SchemaState:queueing, SchemaID:0, TableID:0, RowCount:0, ArgLen:0, start time: %s, Err:, ErrCount:0, SnapshotVersion:0", time.Unix(0, 0)), job.String()) + require.Equal(t, fmt.Sprintf("ID:123, Type:none, State:none, SchemaState:none, SchemaID:0, TableID:0, RowCount:0, ArgLen:0, start time: %s, Err:, ErrCount:0, SnapshotVersion:0", time.Unix(0, 0)), job.String()) } func TestJobCodec(t *testing.T) { diff --git a/util/admin/admin_test.go b/util/admin/admin_test.go index 591e741e6689b..de8d63188fa45 100644 --- a/util/admin/admin_test.go +++ b/util/admin/admin_test.go @@ -103,7 +103,7 @@ func TestGetDDLJobs(t *testing.T) { currJobs2 = currJobs2[:0] err = IterAllDDLJobs(txn, func(jobs []*model.Job) (b bool, e error) { for _, job := range jobs { - if job.State == model.JobStateNone { + if job.NotStarted() { currJobs2 = append(currJobs2, job) } else { return true, nil From f059d566ea68ab19f60519993ce42acf09eb9eb9 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 11 Apr 2022 14:37:52 +0800 Subject: [PATCH 2/2] address comment --- ddl/column_modify_test.go | 4 ++-- ddl/db_rename_test.go | 2 +- ddl/db_table_test.go | 8 ++++---- ddl/ddl_worker.go | 4 ---- ddl/index_modify_test.go | 2 +- 5 files changed, 8 insertions(+), 12 deletions(-) diff --git a/ddl/column_modify_test.go b/ddl/column_modify_test.go index 12cd8296682ac..b84b9f948c517 100644 --- a/ddl/column_modify_test.go +++ b/ddl/column_modify_test.go @@ -456,7 +456,7 @@ func TestCancelDropColumn(t *testing.T) { JobSchemaState model.SchemaState cancelSucc bool }{ - {true, model.JobStateNone, model.StateNone, true}, + {true, model.JobStateQueueing, model.StateNone, true}, {false, model.JobStateRunning, model.StateWriteOnly, false}, {true, model.JobStateRunning, model.StateDeleteOnly, false}, {true, model.JobStateRunning, model.StateDeleteReorganization, false}, @@ -560,7 +560,7 @@ func TestCancelDropColumns(t *testing.T) { JobSchemaState model.SchemaState cancelSucc bool }{ - {true, model.JobStateNone, model.StateNone, true}, + {true, model.JobStateQueueing, model.StateNone, true}, {false, model.JobStateRunning, model.StateWriteOnly, false}, {true, model.JobStateRunning, model.StateDeleteOnly, false}, {true, model.JobStateRunning, model.StateDeleteReorganization, false}, diff --git a/ddl/db_rename_test.go b/ddl/db_rename_test.go index 861efe3257225..b84d22e63a31c 100644 --- a/ddl/db_rename_test.go +++ b/ddl/db_rename_test.go @@ -75,7 +75,7 @@ func TestCancelRenameIndex(t *testing.T) { var checkErr error hook := &ddl.TestDDLCallback{Do: dom} hook.OnJobRunBeforeExported = func(job *model.Job) { - if job.Type == model.ActionRenameIndex && job.State == model.JobStateNone { + if job.Type == model.ActionRenameIndex && job.State == model.JobStateQueueing { jobIDs := []int64{job.ID} hookCtx := mock.NewContext() hookCtx.Store = store diff --git a/ddl/db_table_test.go b/ddl/db_table_test.go index 13a6113ad6830..146aa01e38608 100644 --- a/ddl/db_table_test.go +++ b/ddl/db_table_test.go @@ -59,12 +59,12 @@ func TestCancelDropTableAndSchema(t *testing.T) { }{ // Check drop table. // model.JobStateNone means the jobs is canceled before the first run. - {true, model.ActionDropTable, model.JobStateNone, model.StateNone, true}, + {true, model.ActionDropTable, model.JobStateQueueing, model.StateNone, true}, {false, model.ActionDropTable, model.JobStateRunning, model.StateWriteOnly, false}, {true, model.ActionDropTable, model.JobStateRunning, model.StateDeleteOnly, false}, // Check drop database. - {true, model.ActionDropSchema, model.JobStateNone, model.StateNone, true}, + {true, model.ActionDropSchema, model.JobStateQueueing, model.StateNone, true}, {false, model.ActionDropSchema, model.JobStateRunning, model.StateWriteOnly, false}, {true, model.ActionDropSchema, model.JobStateRunning, model.StateDeleteOnly, false}, } @@ -437,8 +437,8 @@ func TestCancelAddTableAndDropTablePartition(t *testing.T) { JobSchemaState model.SchemaState cancelSucc bool }{ - {model.ActionAddTablePartition, model.JobStateNone, model.StateNone, true}, - {model.ActionDropTablePartition, model.JobStateNone, model.StateNone, true}, + {model.ActionAddTablePartition, model.JobStateQueueing, model.StateNone, true}, + {model.ActionDropTablePartition, model.JobStateQueueing, model.StateNone, true}, // Add table partition now can be cancelled in ReplicaOnly state. {model.ActionAddTablePartition, model.JobStateRunning, model.StateReplicaOnly, true}, } diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index ab2234528e9ab..6bbf0beae14ee 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -624,10 +624,6 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error { return errors.Trace(err) } - if job.IsQueueing() { - job.State = model.JobStateNone - } - d.mu.RLock() d.mu.hook.OnJobRunBefore(job) d.mu.RUnlock() diff --git a/ddl/index_modify_test.go b/ddl/index_modify_test.go index 586e9c8ffa7c8..0066bac4a8391 100644 --- a/ddl/index_modify_test.go +++ b/ddl/index_modify_test.go @@ -1151,7 +1151,7 @@ func testCancelDropIndexes(t *testing.T, store kv.Storage, d ddl.DDL) { }{ // model.JobStateNone means the jobs is canceled before the first run. // if we cancel successfully, we need to set needAddIndex to false in the next test case. Otherwise, set needAddIndex to true. - {true, model.JobStateNone, model.StateNone, true}, + {true, model.JobStateQueueing, model.StateNone, true}, {false, model.JobStateRunning, model.StateWriteOnly, false}, {true, model.JobStateRunning, model.StateDeleteOnly, false}, {true, model.JobStateRunning, model.StateDeleteReorganization, false},