Skip to content

Commit

Permalink
ddl: refactor V2 job args of dropping column DDL. (#56021)
Browse files Browse the repository at this point in the history
ref #53930
  • Loading branch information
joccau authored Sep 20, 2024
1 parent fb9b8a1 commit 6bdf685
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 76 deletions.
21 changes: 14 additions & 7 deletions pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,12 @@ func onDropColumn(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64,
if err != nil {
return ver, errors.Trace(err)
}
job.Args = append(job.Args, indexInfosToIDList(idxInfos))
dropColumnArgs, err := model.GetDropColumnArgs(job)
if err != nil {
return ver, errors.Trace(err)
}
dropColumnArgs.IndexIDs = indexInfosToIDList(idxInfos)
job.FillArgs(dropColumnArgs)
case model.StateDeleteOnly:
// delete only -> reorganization
colInfo.State = model.StateDeleteReorganization
Expand All @@ -215,7 +220,12 @@ func onDropColumn(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64,
} else {
// We should set related index IDs for job
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
job.Args = append(job.Args, getPartitionIDs(tblInfo))
dropColumnArgs, err := model.GetDropColumnArgs(job)
if err != nil {
return ver, errors.Trace(err)
}
dropColumnArgs.PartitionIDs = getPartitionIDs(tblInfo)
job.FillArgs(dropColumnArgs)
}
default:
return ver, errors.Trace(dbterror.ErrInvalidDDLJob.GenWithStackByArgs("table", tblInfo.State))
Expand All @@ -231,16 +241,13 @@ func checkDropColumn(jobCtx *jobContext, t *meta.Meta, job *model.Job) (*model.T
return nil, nil, nil, false, errors.Trace(err)
}

var colName pmodel.CIStr
var ifExists bool
// indexIDs is used to make sure we don't truncate args when decoding the rawArgs.
var indexIDs []int64
err = job.DecodeArgs(&colName, &ifExists, &indexIDs)
dropColumnArgs, err := model.GetDropColumnArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, nil, false, errors.Trace(err)
}

colName, ifExists := dropColumnArgs.ColName, dropColumnArgs.IfExists
colInfo := model.FindColumnInfo(tblInfo.Columns, colName.L)
if colInfo == nil || colInfo.Hidden {
job.State = model.JobStateCancelled
Expand Down
14 changes: 7 additions & 7 deletions pkg/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,13 @@ func TestBuildJobDependence(t *testing.T) {
}()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
// Add some non-add-index jobs.
job1 := &model.Job{ID: 1, TableID: 1, Type: model.ActionAddColumn}
job2 := &model.Job{ID: 2, TableID: 1, Type: model.ActionCreateTable}
job3 := &model.Job{ID: 3, TableID: 2, Type: model.ActionDropColumn}
job6 := &model.Job{ID: 6, TableID: 1, Type: model.ActionDropTable}
job7 := &model.Job{ID: 7, TableID: 2, Type: model.ActionModifyColumn}
job9 := &model.Job{ID: 9, SchemaID: 111, Type: model.ActionDropSchema}
job11 := &model.Job{Version: model.JobVersion1, ID: 11, TableID: 2, Type: model.ActionRenameTable, Args: []any{int64(111), "old db name"}}
job1 := &model.Job{ID: 1, TableID: 1, Version: model.JobVersion1, Type: model.ActionAddColumn}
job2 := &model.Job{ID: 2, TableID: 1, Version: model.JobVersion1, Type: model.ActionCreateTable}
job3 := &model.Job{ID: 3, TableID: 2, Version: model.JobVersion1, Type: model.ActionDropColumn}
job6 := &model.Job{ID: 6, TableID: 1, Version: model.JobVersion1, Type: model.ActionDropTable}
job7 := &model.Job{ID: 7, TableID: 2, Version: model.JobVersion1, Type: model.ActionModifyColumn}
job9 := &model.Job{ID: 9, SchemaID: 111, Version: model.JobVersion1, Type: model.ActionDropSchema}
job11 := &model.Job{ID: 11, TableID: 2, Version: model.JobVersion1, Type: model.ActionRenameTable, Args: []any{int64(111), "old db name"}}
err := kv.RunInNewTxn(ctx, store, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
require.NoError(t, m.EnQueueDDLJob(job1))
Expand Down
19 changes: 8 additions & 11 deletions pkg/ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand Down Expand Up @@ -400,19 +399,17 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap
}
}
case model.ActionDropColumn:
var colName pmodel.CIStr
var ifExists bool
var indexIDs []int64
var partitionIDs []int64
if err := job.DecodeArgs(&colName, &ifExists, &indexIDs, &partitionIDs); err != nil {
args, err := model.GetDropColumnArgs(job)
if err != nil {
return errors.Trace(err)
}
if len(indexIDs) > 0 {
if len(partitionIDs) == 0 {
return errors.Trace(doBatchDeleteIndiceRange(ctx, wrapper, job.ID, job.TableID, indexIDs, ea, "drop column: table ID"))

if len(args.IndexIDs) > 0 {
if len(args.PartitionIDs) == 0 {
return errors.Trace(doBatchDeleteIndiceRange(ctx, wrapper, job.ID, job.TableID, args.IndexIDs, ea, "drop column: table ID"))
}
for _, pid := range partitionIDs {
if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, pid, indexIDs, ea, "drop column: partition table ID"); err != nil {
for _, pid := range args.PartitionIDs {
if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, pid, args.IndexIDs, ea, "drop column: partition table ID"); err != nil {
return errors.Trace(err)
}
}
Expand Down
15 changes: 11 additions & 4 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3159,19 +3159,26 @@ func (e *executor) DropColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.Al
}

job := &model.Job{
// to do(joccau)
// we should set Version = model.GetJobVerInUse() after refactor the actionMultiSchemaChange.
Version: model.JobVersion1,
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
SchemaState: model.StatePublic,
TableName: t.Meta().Name.L,
Type: model.ActionDropColumn,
BinlogInfo: &model.HistoryInfo{},
Args: []any{colName, spec.IfExists},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}

err = e.DoDDLJob(ctx, job)
args := &model.DropColumnArgs{
ColName: colName,
IfExists: spec.IfExists,
}
// we need fill args here, because it will be added subjob which contains args and rawArgs from job.
job.FillArgs(args)
err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand Down Expand Up @@ -6280,7 +6287,7 @@ func (e *executor) DoDDLJobWrapper(ctx sessionctx.Context, jobW *JobWrapper) (re
if mci := ctx.GetSessionVars().StmtCtx.MultiSchemaInfo; mci != nil {
// In multiple schema change, we don't run the job.
// Instead, we merge all the jobs into one pending job.
return appendToSubJobs(mci, job)
return appendToSubJobs(mci, jobW)
}
// Get a global job ID and put the DDL job in the queue.
setDDLJobQuery(ctx, job)
Expand Down
24 changes: 12 additions & 12 deletions pkg/ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,30 +174,30 @@ func handleRollbackException(runJobErr error, proxyJobErr *terror.Error) error {
return nil
}

func appendToSubJobs(m *model.MultiSchemaInfo, job *model.Job) error {
err := fillMultiSchemaInfo(m, job)
func appendToSubJobs(m *model.MultiSchemaInfo, jobW *JobWrapper) error {
err := fillMultiSchemaInfo(m, jobW)
if err != nil {
return err
}
var reorgTp model.ReorgType
if job.ReorgMeta != nil {
reorgTp = job.ReorgMeta.ReorgTp
if jobW.ReorgMeta != nil {
reorgTp = jobW.ReorgMeta.ReorgTp
}
m.SubJobs = append(m.SubJobs, &model.SubJob{
Type: job.Type,
Args: job.Args,
RawArgs: job.RawArgs,
SchemaState: job.SchemaState,
SnapshotVer: job.SnapshotVer,
Type: jobW.Type,
Args: jobW.Args,
RawArgs: jobW.RawArgs,
SchemaState: jobW.SchemaState,
SnapshotVer: jobW.SnapshotVer,
Revertible: true,
CtxVars: job.CtxVars,
CtxVars: jobW.CtxVars,
ReorgTp: reorgTp,
UseCloud: false,
})
return nil
}

func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *model.Job) (err error) {
func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *JobWrapper) error {
switch job.Type {
case model.ActionAddColumn:
col := job.Args[0].(*table.Column)
Expand All @@ -210,7 +210,7 @@ func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *model.Job) (err error
info.PositionColumns = append(info.PositionColumns, pos.RelativeColumn.Name)
}
case model.ActionDropColumn:
colName := job.Args[0].(pmodel.CIStr)
colName := job.JobArgs.(*model.DropColumnArgs).ColName
info.DropColumns = append(info.DropColumns, colName)
case model.ActionDropIndex, model.ActionDropPrimaryKey:
indexName := job.Args[0].(pmodel.CIStr)
Expand Down
13 changes: 5 additions & 8 deletions pkg/ddl/sanity_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/mathutil"
Expand Down Expand Up @@ -152,15 +151,13 @@ func expectedDeleteRangeCnt(ctx delRangeCntCtx, job *model.Job) (int, error) {
}
return mathutil.Max(len(partitionIDs), 1), nil
case model.ActionDropColumn:
var colName pmodel.CIStr
var ifExists bool
var indexIDs []int64
var partitionIDs []int64
if err := job.DecodeArgs(&colName, &ifExists, &indexIDs, &partitionIDs); err != nil {
args, err := model.GetDropColumnArgs(job)
if err != nil {
return 0, errors.Trace(err)
}
physicalCnt := mathutil.Max(len(partitionIDs), 1)
return physicalCnt * len(indexIDs), nil

physicalCnt := mathutil.Max(len(args.PartitionIDs), 1)
return physicalCnt * len(args.IndexIDs), nil
case model.ActionModifyColumn:
var indexIDs []int64
var partitionIDs []int64
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/model/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ go_test(
],
embed = [":model"],
flaky = True,
shard_count = 34,
shard_count = 35,
deps = [
"//pkg/parser/charset",
"//pkg/parser/model",
Expand Down
58 changes: 32 additions & 26 deletions pkg/meta/model/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,39 +490,45 @@ func (job *Job) FillFinishedArgs(args FinishedJobArgs) {
args.fillFinishedJob(job)
}

func marshalArgs(jobVer JobVersion, args []any) (json.RawMessage, error) {
if jobVer <= JobVersion1 {
rawArgs, err := json.Marshal(args)
return rawArgs, errors.Trace(err)
}

intest.Assert(jobVer == JobVersion2, "job version is not v2")
var arg any
if len(args) > 0 {
intest.Assert(len(args) == 1, "Job.Args should have only one element")
arg = args[0]
}

rawArgs, err := json.Marshal(arg)
return rawArgs, errors.Trace(err)
}

// Encode encodes job with json format.
// updateRawArgs is used to determine whether to update the raw args.
func (job *Job) Encode(updateRawArgs bool) ([]byte, error) {
var err error
if updateRawArgs {
if job.Version == JobVersion1 {
job.RawArgs, err = json.Marshal(job.Args)
if err != nil {
return nil, errors.Trace(err)
}
if job.MultiSchemaInfo != nil {
for _, sub := range job.MultiSchemaInfo.SubJobs {
// Only update the args of executing sub-jobs.
if sub.Args == nil {
continue
}
sub.RawArgs, err = json.Marshal(sub.Args)
if err != nil {
return nil, errors.Trace(err)
}
job.RawArgs, err = marshalArgs(job.Version, job.Args)
if err != nil {
return nil, errors.Trace(err)
}

if job.MultiSchemaInfo != nil {
for _, sub := range job.MultiSchemaInfo.SubJobs {
// Only update the args of executing sub-jobs.
if sub.Args == nil {
continue
}

sub.RawArgs, err = marshalArgs(job.Version, sub.Args)
if err != nil {
return nil, errors.Trace(err)
}
}
} else {
var arg any
if len(job.Args) > 0 {
intest.Assert(len(job.Args) == 1, "Job.Args should have only one element")
arg = job.Args[0]
}
job.RawArgs, err = json.Marshal(arg)
if err != nil {
return nil, errors.Trace(err)
}
// TODO remember update sub-jobs' RawArgs when we do it.
}
}

Expand Down
43 changes: 43 additions & 0 deletions pkg/meta/model/job_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,49 @@ func GetResourceGroupArgs(job *Job) (*ResourceGroupArgs, error) {
return getOrDecodeArgsV2[*ResourceGroupArgs](job)
}

// DropColumnArgs is the arguments of dropping column job.
type DropColumnArgs struct {
ColName pmodel.CIStr `json:"column_name,omitempty"`
IfExists bool `json:"if_exists,omitempty"`
// below 2 fields are filled during running.
IndexIDs []int64 `json:"index_ids,omitempty"`
PartitionIDs []int64 `json:"partition_ids,omitempty"`
}

func (a *DropColumnArgs) fillJob(job *Job) {
if job.Version <= JobVersion1 {
job.Args = []any{a.ColName, a.IfExists, a.IndexIDs, a.PartitionIDs}
} else {
job.Args = []any{a}
}
}

// GetDropColumnArgs gets the args for drop column ddl.
func GetDropColumnArgs(job *Job) (*DropColumnArgs, error) {
var (
colName pmodel.CIStr
ifExists bool
indexIDs []int64
partitionIDs []int64
)

if job.Version <= JobVersion1 {
err := job.DecodeArgs(&colName, &ifExists, &indexIDs, &partitionIDs)
if err != nil {
return nil, errors.Trace(err)
}

return &DropColumnArgs{
ColName: colName,
IfExists: ifExists,
IndexIDs: indexIDs,
PartitionIDs: partitionIDs,
}, nil
}

return getOrDecodeArgsV2[*DropColumnArgs](job)
}

// RenameTablesArgs is the arguments for rename tables job.
type RenameTablesArgs struct {
RenameTableInfos []*RenameTableArgs `json:"rename_table_infos,omitempty"`
Expand Down
17 changes: 17 additions & 0 deletions pkg/meta/model/job_args_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,20 @@ func TestResourceGroupArgs(t *testing.T) {
}
}
}

func TestDropColumnArgs(t *testing.T) {
inArgs := &DropColumnArgs{
ColName: model.NewCIStr("col_name"),
IfExists: true,
IndexIDs: []int64{1, 2, 3},
PartitionIDs: []int64{4, 5, 6},
}

for _, v := range []JobVersion{JobVersion1, JobVersion2} {
j2 := &Job{}
require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionDropColumn)))
args, err := GetDropColumnArgs(j2)
require.NoError(t, err)
require.Equal(t, inArgs, args)
}
}

0 comments on commit 6bdf685

Please sign in to comment.