Skip to content

Commit

Permalink
*: support show ddl jobs for sub-jobs (#36168)
Browse files Browse the repository at this point in the history
ref #14766
  • Loading branch information
tangenta authored Jul 13, 2022
1 parent d93bc7a commit 2f934d6
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 1 deletion.
35 changes: 35 additions & 0 deletions ddl/multi_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,41 @@ func TestMultiSchemaChangeModifyColumnsCancelled(t *testing.T) {
Check(testkit.Rows("int"))
}

func TestMultiSchemaChangeAdminShowDDLJobs(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
originHook := dom.DDL().GetHook()
hook := &ddl.TestDDLCallback{Do: dom}
hook.OnJobRunBeforeExported = func(job *model.Job) {
assert.Equal(t, model.ActionMultiSchemaChange, job.Type)
if job.MultiSchemaInfo.SubJobs[0].SchemaState == model.StateDeleteOnly {
newTk := testkit.NewTestKit(t, store)
rows := newTk.MustQuery("admin show ddl jobs 1").Rows()
// 1 history job and 1 running job with 2 subjobs
assert.Equal(t, len(rows), 4)
assert.Equal(t, rows[1][1], "test")
assert.Equal(t, rows[1][2], "t")
assert.Equal(t, rows[1][3], "add index /* subjob */")
assert.Equal(t, rows[1][4], "delete only")
assert.Equal(t, rows[1][len(rows[1])-1], "running")

assert.Equal(t, rows[2][3], "add index /* subjob */")
assert.Equal(t, rows[2][4], "none")
assert.Equal(t, rows[2][len(rows[2])-1], "queueing")
}
}

tk.MustExec("create table t (a int, b int, c int)")
tk.MustExec("insert into t values (1, 2, 3)")

dom.DDL().SetHook(hook)
tk.MustExec("alter table t add index t(a), add index t1(b)")
dom.DDL().SetHook(originHook)
}

func TestMultiSchemaChangeWithExpressionIndex(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
Expand Down
16 changes: 16 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,22 @@ func (e *DDLJobRetriever) appendJobToChunk(req *chunk.Chunk, job *model.Job, che
req.AppendNull(10)
}
req.AppendString(11, job.State.String())
if job.Type == model.ActionMultiSchemaChange {
for _, subJob := range job.MultiSchemaInfo.SubJobs {
req.AppendInt64(0, job.ID)
req.AppendString(1, schemaName)
req.AppendString(2, tableName)
req.AppendString(3, subJob.Type.String()+" /* subjob */")
req.AppendString(4, subJob.SchemaState.String())
req.AppendInt64(5, job.SchemaID)
req.AppendInt64(6, job.TableID)
req.AppendInt64(7, subJob.RowCount)
req.AppendNull(8)
req.AppendNull(9)
req.AppendNull(10)
req.AppendString(11, subJob.State.String())
}
}
}

func ts2Time(timestamp uint64, loc *time.Location) types.Time {
Expand Down
8 changes: 8 additions & 0 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,6 +1303,9 @@ func (e *DDLJobsReaderExec) Next(ctx context.Context, req *chunk.Chunk) error {
for i := e.cursor; i < e.cursor+num; i++ {
e.appendJobToChunk(req, e.runningJobs[i], checker)
req.AppendString(12, e.runningJobs[i].Query)
for range e.runningJobs[i].MultiSchemaInfo.SubJobs {
req.AppendString(12, e.runningJobs[i].Query)
}
}
e.cursor += num
count += num
Expand All @@ -1318,6 +1321,11 @@ func (e *DDLJobsReaderExec) Next(ctx context.Context, req *chunk.Chunk) error {
for _, job := range e.cacheJobs {
e.appendJobToChunk(req, job, checker)
req.AppendString(12, job.Query)
if job.Type == model.ActionMultiSchemaChange {
for range job.MultiSchemaInfo.SubJobs {
req.AppendString(12, job.Query)
}
}
}
e.cursor += len(e.cacheJobs)
}
Expand Down
5 changes: 5 additions & 0 deletions executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ func TestDDLJobs(t *testing.T) {
DDLJobsTester.MustExec("set role r_priv")
DDLJobsTester.MustQuery("select DB_NAME, TABLE_NAME from information_schema.DDL_JOBS where DB_NAME = 'test_ddl_jobs' and TABLE_NAME = 't';").Check(
testkit.Rows("test_ddl_jobs t"))

tk.MustExec("create table tt (a int);")
tk.MustExec("alter table tt add index t(a), add column b int")
tk.MustQuery("select db_name, table_name, job_type from information_schema.DDL_JOBS limit 3").Check(
testkit.Rows("test_ddl_jobs tt alter table multi-schema change", "test_ddl_jobs tt add index /* subjob */", "test_ddl_jobs tt add column /* subjob */"))
}

func TestKeyColumnUsage(t *testing.T) {
Expand Down
6 changes: 5 additions & 1 deletion parser/model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,8 +554,12 @@ func (job *Job) DecodeArgs(args ...interface{}) error {
// String implements fmt.Stringer interface.
func (job *Job) String() string {
rowCount := job.GetRowCount()
return fmt.Sprintf("ID:%d, Type:%s, State:%s, SchemaState:%s, SchemaID:%d, TableID:%d, RowCount:%d, ArgLen:%d, start time: %v, Err:%v, ErrCount:%d, SnapshotVersion:%v",
ret := fmt.Sprintf("ID:%d, Type:%s, State:%s, SchemaState:%s, SchemaID:%d, TableID:%d, RowCount:%d, ArgLen:%d, start time: %v, Err:%v, ErrCount:%d, SnapshotVersion:%v",
job.ID, job.Type, job.State, job.SchemaState, job.SchemaID, job.TableID, rowCount, len(job.Args), TSConvert2Time(job.StartTS), job.Error, job.ErrorCount, job.SnapshotVer)
if job.Type != ActionMultiSchemaChange && job.MultiSchemaInfo != nil {
ret += fmt.Sprintf(", Multi-Schema Change:true, Revertible:%v", job.MultiSchemaInfo.Revertible)
}
return ret
}

func (job *Job) hasDependentSchema(other *Job) (bool, error) {
Expand Down

0 comments on commit 2f934d6

Please sign in to comment.