Skip to content

Commit

Permalink
ddl: fix inappropriate schema state output by admin show ddl jobs (#3…
Browse files Browse the repository at this point in the history
…3850)

ref #23494, close #24420
  • Loading branch information
tangenta committed Apr 11, 2022
1 parent 0d7602e commit 67b0734
Show file tree
Hide file tree
Showing 10 changed files with 25 additions and 12 deletions.
4 changes: 2 additions & 2 deletions ddl/column_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ func TestCancelDropColumn(t *testing.T) {
JobSchemaState model.SchemaState
cancelSucc bool
}{
{true, model.JobStateNone, model.StateNone, true},
{true, model.JobStateQueueing, model.StateNone, true},
{false, model.JobStateRunning, model.StateWriteOnly, false},
{true, model.JobStateRunning, model.StateDeleteOnly, false},
{true, model.JobStateRunning, model.StateDeleteReorganization, false},
Expand Down Expand Up @@ -554,7 +554,7 @@ func TestCancelDropColumns(t *testing.T) {
JobSchemaState model.SchemaState
cancelSucc bool
}{
{true, model.JobStateNone, model.StateNone, true},
{true, model.JobStateQueueing, model.StateNone, true},
{false, model.JobStateRunning, model.StateWriteOnly, false},
{true, model.JobStateRunning, model.StateDeleteOnly, false},
{true, model.JobStateRunning, model.StateDeleteReorganization, false},
Expand Down
2 changes: 1 addition & 1 deletion ddl/column_type_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func TestRollbackColumnTypeChangeBetweenInteger(t *testing.T) {
// Alter sql will modify column c2 to bigint not null.
SQL := "alter table t modify column c2 int not null"
err := tk.ExecToErr(SQL)
require.EqualError(t, err, "[ddl:1]MockRollingBackInCallBack-queueing")
require.EqualError(t, err, "[ddl:1]MockRollingBackInCallBack-none")
assertRollBackedColUnchanged(t, tk)

// Mock roll back at model.StateDeleteOnly.
Expand Down
2 changes: 1 addition & 1 deletion ddl/db_rename_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestCancelRenameIndex(t *testing.T) {
var checkErr error
hook := &ddl.TestDDLCallback{Do: dom}
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionRenameIndex && job.State == model.JobStateNone {
if job.Type == model.ActionRenameIndex && job.State == model.JobStateQueueing {
jobIDs := []int64{job.ID}
hookCtx := mock.NewContext()
hookCtx.Store = store
Expand Down
8 changes: 4 additions & 4 deletions ddl/db_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ func TestCancelDropTableAndSchema(t *testing.T) {
}{
// Check drop table.
// model.JobStateNone means the jobs is canceled before the first run.
{true, model.ActionDropTable, model.JobStateNone, model.StateNone, true},
{true, model.ActionDropTable, model.JobStateQueueing, model.StateNone, true},
{false, model.ActionDropTable, model.JobStateRunning, model.StateWriteOnly, false},
{true, model.ActionDropTable, model.JobStateRunning, model.StateDeleteOnly, false},

// Check drop database.
{true, model.ActionDropSchema, model.JobStateNone, model.StateNone, true},
{true, model.ActionDropSchema, model.JobStateQueueing, model.StateNone, true},
{false, model.ActionDropSchema, model.JobStateRunning, model.StateWriteOnly, false},
{true, model.ActionDropSchema, model.JobStateRunning, model.StateDeleteOnly, false},
}
Expand Down Expand Up @@ -437,8 +437,8 @@ func TestCancelAddTableAndDropTablePartition(t *testing.T) {
JobSchemaState model.SchemaState
cancelSucc bool
}{
{model.ActionAddTablePartition, model.JobStateNone, model.StateNone, true},
{model.ActionDropTablePartition, model.JobStateNone, model.StateNone, true},
{model.ActionAddTablePartition, model.JobStateQueueing, model.StateNone, true},
{model.ActionDropTablePartition, model.JobStateQueueing, model.StateNone, true},
// Add table partition now can be cancelled in ReplicaOnly state.
{model.ActionAddTablePartition, model.JobStateRunning, model.StateReplicaOnly, true},
}
Expand Down
1 change: 1 addition & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) {
job.Version = currentVersion
job.StartTS = txn.StartTS()
job.ID = ids[i]
job.State = model.JobStateQueueing
if err = buildJobDependence(t, job); err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/index_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1149,7 +1149,7 @@ func testCancelDropIndexes(t *testing.T, store kv.Storage, d ddl.DDL) {
}{
// model.JobStateNone means the jobs is canceled before the first run.
// if we cancel successfully, we need to set needAddIndex to false in the next test case. Otherwise, set needAddIndex to true.
{true, model.JobStateNone, model.StateNone, true},
{true, model.JobStateQueueing, model.StateNone, true},
{false, model.JobStateRunning, model.StateWriteOnly, false},
{true, model.JobStateRunning, model.StateDeleteOnly, false},
{true, model.JobStateRunning, model.StateDeleteReorganization, false},
Expand Down
12 changes: 12 additions & 0 deletions parser/model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,14 @@ func (job *Job) IsRunning() bool {
return job.State == JobStateRunning
}

func (job *Job) IsQueueing() bool {
return job.State == JobStateQueueing
}

func (job *Job) NotStarted() bool {
return job.State == JobStateNone || job.State == JobStateQueueing
}

// JobState is for job state.
type JobState byte

Expand All @@ -522,6 +530,8 @@ const (
JobStateSynced JobState = 6
// JobStateCancelling is used to mark the DDL job is cancelled by the client, but the DDL work hasn't handle it.
JobStateCancelling JobState = 7
// JobStateQueueing means the job has not yet been started.
JobStateQueueing JobState = 8
)

// String implements fmt.Stringer interface.
Expand All @@ -541,6 +551,8 @@ func (s JobState) String() string {
return "cancelling"
case JobStateSynced:
return "synced"
case JobStateQueueing:
return "queueing"
default:
return "none"
}
Expand Down
2 changes: 1 addition & 1 deletion parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (s SchemaState) String() string {
case StateGlobalTxnOnly:
return "global txn only"
default:
return "queueing"
return "none"
}
}

Expand Down
2 changes: 1 addition & 1 deletion parser/model/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func TestJobStartTime(t *testing.T) {
BinlogInfo: &HistoryInfo{},
}
require.Equal(t, TSConvert2Time(job.StartTS), time.Unix(0, 0))
require.Equal(t, fmt.Sprintf("ID:123, Type:none, State:none, SchemaState:queueing, SchemaID:0, TableID:0, RowCount:0, ArgLen:0, start time: %s, Err:<nil>, ErrCount:0, SnapshotVersion:0", time.Unix(0, 0)), job.String())
require.Equal(t, fmt.Sprintf("ID:123, Type:none, State:none, SchemaState:none, SchemaID:0, TableID:0, RowCount:0, ArgLen:0, start time: %s, Err:<nil>, ErrCount:0, SnapshotVersion:0", time.Unix(0, 0)), job.String())
}

func TestJobCodec(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion util/admin/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestGetDDLJobs(t *testing.T) {
currJobs2 = currJobs2[:0]
err = IterAllDDLJobs(txn, func(jobs []*model.Job) (b bool, e error) {
for _, job := range jobs {
if job.State == model.JobStateNone {
if job.NotStarted() {
currJobs2 = append(currJobs2, job)
} else {
return true, nil
Expand Down

0 comments on commit 67b0734

Please sign in to comment.