diff --git a/pkg/ddl/foreign_key_test.go b/pkg/ddl/foreign_key_test.go index a481525bdd0bd..d745d57b39891 100644 --- a/pkg/ddl/foreign_key_test.go +++ b/pkg/ddl/foreign_key_test.go @@ -71,7 +71,6 @@ func testCreateForeignKey(t *testing.T, d ddl.ExecutorForTest, ctx sessionctx.Co ctx.SetValue(sessionctx.QueryString, "skip") args := &model.AddForeignKeyArgs{FkInfo: fkInfo} - job.FillArgs(args) err = d.DoDDLJobWrapper(ctx, ddl.NewJobWrapperWithArgs(job, args, true)) require.NoError(t, err) return job @@ -89,7 +88,6 @@ func testDropForeignKey(t *testing.T, ctx sessionctx.Context, d ddl.ExecutorForT } ctx.SetValue(sessionctx.QueryString, "skip") args := &model.DropForeignKeyArgs{FkName: pmodel.NewCIStr(foreignKeyName)} - job.FillArgs(args) err := d.DoDDLJobWrapper(ctx, ddl.NewJobWrapperWithArgs(job, args, true)) require.NoError(t, err) v := getSchemaVer(t, ctx) diff --git a/pkg/ddl/table_test.go b/pkg/ddl/table_test.go index 9d58ba47e1a77..588554028bf9e 100644 --- a/pkg/ddl/table_test.go +++ b/pkg/ddl/table_test.go @@ -93,9 +93,12 @@ func testRenameTables(t *testing.T, ctx sessionctx.Context, d ddl.ExecutorForTes }, } - args := model.GetRenameTablesArgsFromV1( - oldSchemaIDs, oldSchemaNames, oldTableNames, - newSchemaIDs, newTableNames, oldTableIDs) + args := &model.RenameTablesArgs{ + RenameTableInfos: model.GetRenameTablesArgsFromV1( + oldSchemaIDs, oldSchemaNames, oldTableNames, + newSchemaIDs, newTableNames, oldTableIDs, + ), + } ctx.SetValue(sessionctx.QueryString, "skip") require.NoError(t, d.DoDDLJobWrapper(ctx, ddl.NewJobWrapperWithArgs(job, args, true))) diff --git a/pkg/meta/model/job.go b/pkg/meta/model/job.go index ea73b172dee40..f885191f1b4ff 100644 --- a/pkg/meta/model/job.go +++ b/pkg/meta/model/job.go @@ -486,7 +486,7 @@ func (job *Job) GetWarnings() (map[errors.ErrorID]*terror.Error, map[errors.Erro func (job *Job) FillArgs(args JobArgs) { intest.Assert(job.Version == JobVersion1 || job.Version == JobVersion2, "job version is invalid") if job.Version == JobVersion1 { - args.fillJobV1(job) + job.Args = args.getArgsV1(job) return } job.Args = []any{args} @@ -496,7 +496,7 @@ func (job *Job) FillArgs(args JobArgs) { func (job *Job) FillFinishedArgs(args FinishedJobArgs) { intest.Assert(job.Version == JobVersion1 || job.Version == JobVersion2, "job version is invalid") if job.Version == JobVersion1 { - args.fillFinishedJobV1(job) + job.Args = args.getFinishedArgsV1(job) return } job.Args = []any{args} diff --git a/pkg/meta/model/job_args.go b/pkg/meta/model/job_args.go index bd221375ba41a..f127e629710a9 100644 --- a/pkg/meta/model/job_args.go +++ b/pkg/meta/model/job_args.go @@ -55,8 +55,21 @@ type RecoverSchemaInfo struct { OldSchemaName pmodel.CIStr } -// getOrDecodeArgsV2 get the argsV2 from job, if the argsV2 is nil, decode rawArgsV2 -// and fill argsV2. +// getOrDecodeArgsV1 get the args v1 from job, if the job.Args is nil, decode job.RawArgs +// and cache in job.Args. +// as there is no way to create a generic struct with a type parameter in Go, we +// have to pass one instance of the struct to the function. +func getOrDecodeArgsV1[T JobArgs](args T, job *Job) (T, error) { + intest.Assert(job.Version == JobVersion1, "job version is not v1") + var v T + if err := args.decodeV1(job); err != nil { + return v, errors.Trace(err) + } + return args, nil +} + +// getOrDecodeArgsV2 get the args v2 from job, if the job.Args is nil, decode job.RawArgs +// and cache in job.Args. func getOrDecodeArgsV2[T JobArgs](job *Job) (T, error) { intest.Assert(job.Version == JobVersion2, "job version is not v2") if len(job.Args) > 0 { @@ -71,11 +84,19 @@ func getOrDecodeArgsV2[T JobArgs](job *Job) (T, error) { return v, nil } +func getOrDecodeArgs[T JobArgs](args T, job *Job) (T, error) { + if job.Version == JobVersion1 { + return getOrDecodeArgsV1[T](args, job) + } + return getOrDecodeArgsV2[T](job) +} + // JobArgs is the interface for job arguments. type JobArgs interface { - // fillJobV1 fills the job args v1 for submitting job. we make it private to - // avoid calling it directly, use Job.FillArgs to fill the job args. - fillJobV1(job *Job) + // getArgsV1 gets the job args for v1. we make it private to avoid calling it + // directly, use Job.FillArgs to fill the job args. + getArgsV1(job *Job) []any + decodeV1(job *Job) error } // FinishedJobArgs is the interface for finished job arguments. @@ -83,41 +104,39 @@ type JobArgs interface { // will write some args back to the job for other components. type FinishedJobArgs interface { JobArgs - // fillFinishedJobV1 fills the job args for finished job. we make it private + // getFinishedArgsV1 fills the job args for finished job. we make it private // to avoid calling it directly, use Job.FillFinishedArgs to fill the job args. - fillFinishedJobV1(job *Job) + getFinishedArgsV1(job *Job) []any } // EmptyArgs is the args for ddl job with no args. type EmptyArgs struct{} -func (*EmptyArgs) fillJobV1(*Job) {} +func (*EmptyArgs) getArgsV1(*Job) []any { + return nil +} + +func (*EmptyArgs) decodeV1(*Job) error { + return nil +} // CreateSchemaArgs is the arguments for create schema job. type CreateSchemaArgs struct { DBInfo *DBInfo `json:"db_info,omitempty"` } -func (a *CreateSchemaArgs) fillJobV1(job *Job) { - job.Args = []any{a.DBInfo} +func (a *CreateSchemaArgs) getArgsV1(*Job) []any { + return []any{a.DBInfo} +} + +func (a *CreateSchemaArgs) decodeV1(job *Job) error { + a.DBInfo = &DBInfo{} + return errors.Trace(job.DecodeArgs(a.DBInfo)) } // GetCreateSchemaArgs gets the args for create schema job. func GetCreateSchemaArgs(job *Job) (*CreateSchemaArgs, error) { - if job.Version == JobVersion1 { - dbInfo := &DBInfo{} - err := job.DecodeArgs(dbInfo) - if err != nil { - return nil, errors.Trace(err) - } - return &CreateSchemaArgs{DBInfo: dbInfo}, nil - } - - argsV2, err := getOrDecodeArgsV2[*CreateSchemaArgs](job) - if err != nil { - return nil, errors.Trace(err) - } - return argsV2, nil + return getOrDecodeArgs[*CreateSchemaArgs](&CreateSchemaArgs{}, job) } // DropSchemaArgs is the arguments for drop schema job. @@ -128,38 +147,31 @@ type DropSchemaArgs struct { AllDroppedTableIDs []int64 `json:"all_dropped_table_ids,omitempty"` } -func (a *DropSchemaArgs) fillJobV1(job *Job) { - job.Args = []any{a.FKCheck} +func (a *DropSchemaArgs) getArgsV1(*Job) []any { + return []any{a.FKCheck} +} + +func (a *DropSchemaArgs) getFinishedArgsV1(*Job) []any { + return []any{a.AllDroppedTableIDs} } -func (a *DropSchemaArgs) fillFinishedJobV1(job *Job) { - job.Args = []any{a.AllDroppedTableIDs} +func (a *DropSchemaArgs) decodeV1(job *Job) error { + return job.DecodeArgs(&a.FKCheck) } // GetDropSchemaArgs gets the args for drop schema job. func GetDropSchemaArgs(job *Job) (*DropSchemaArgs, error) { - return getDropSchemaArgs(job, false) + return getOrDecodeArgs[*DropSchemaArgs](&DropSchemaArgs{}, job) } // GetFinishedDropSchemaArgs gets the args for drop schema job after the job is finished. func GetFinishedDropSchemaArgs(job *Job) (*DropSchemaArgs, error) { - return getDropSchemaArgs(job, true) -} - -func getDropSchemaArgs(job *Job, argsOfFinished bool) (*DropSchemaArgs, error) { if job.Version == JobVersion1 { - if argsOfFinished { - var physicalTableIDs []int64 - if err := job.DecodeArgs(&physicalTableIDs); err != nil { - return nil, err - } - return &DropSchemaArgs{AllDroppedTableIDs: physicalTableIDs}, nil - } - var fkCheck bool - if err := job.DecodeArgs(&fkCheck); err != nil { + var physicalTableIDs []int64 + if err := job.DecodeArgs(&physicalTableIDs); err != nil { return nil, err } - return &DropSchemaArgs{FKCheck: fkCheck}, nil + return &DropSchemaArgs{AllDroppedTableIDs: physicalTableIDs}, nil } return getOrDecodeArgsV2[*DropSchemaArgs](job) } @@ -174,38 +186,23 @@ type ModifySchemaArgs struct { PolicyRef *PolicyRefInfo `json:"policy_ref,omitempty"` } -func (a *ModifySchemaArgs) fillJobV1(job *Job) { +func (a *ModifySchemaArgs) getArgsV1(job *Job) []any { + if job.Type == ActionModifySchemaCharsetAndCollate { + return []any{a.ToCharset, a.ToCollate} + } + return []any{a.PolicyRef} +} + +func (a *ModifySchemaArgs) decodeV1(job *Job) error { if job.Type == ActionModifySchemaCharsetAndCollate { - job.Args = []any{a.ToCharset, a.ToCollate} - } else if job.Type == ActionModifySchemaDefaultPlacement { - job.Args = []any{a.PolicyRef} + return errors.Trace(job.DecodeArgs(&a.ToCharset, &a.ToCollate)) } + return errors.Trace(job.DecodeArgs(&a.PolicyRef)) } // GetModifySchemaArgs gets the modify schema args. func GetModifySchemaArgs(job *Job) (*ModifySchemaArgs, error) { - if job.Version == JobVersion1 { - var ( - toCharset string - toCollate string - policyRef *PolicyRefInfo - ) - if job.Type == ActionModifySchemaCharsetAndCollate { - if err := job.DecodeArgs(&toCharset, &toCollate); err != nil { - return nil, errors.Trace(err) - } - } else if job.Type == ActionModifySchemaDefaultPlacement { - if err := job.DecodeArgs(&policyRef); err != nil { - return nil, errors.Trace(err) - } - } - return &ModifySchemaArgs{ - ToCharset: toCharset, - ToCollate: toCollate, - PolicyRef: policyRef, - }, nil - } - return getOrDecodeArgsV2[*ModifySchemaArgs](job) + return getOrDecodeArgs[*ModifySchemaArgs](&ModifySchemaArgs{}, job) } // CreateTableArgs is the arguments for create table/view/sequence job. @@ -218,48 +215,34 @@ type CreateTableArgs struct { FKCheck bool `json:"fk_check,omitempty"` } -func (a *CreateTableArgs) fillJobV1(job *Job) { +func (a *CreateTableArgs) getArgsV1(job *Job) []any { switch job.Type { case ActionCreateTable: - job.Args = []any{a.TableInfo, a.FKCheck} + return []any{a.TableInfo, a.FKCheck} case ActionCreateView: - job.Args = []any{a.TableInfo, a.OnExistReplace, a.OldViewTblID} + return []any{a.TableInfo, a.OnExistReplace, a.OldViewTblID} case ActionCreateSequence: - job.Args = []any{a.TableInfo} + return []any{a.TableInfo} } + return nil +} + +func (a *CreateTableArgs) decodeV1(job *Job) error { + a.TableInfo = &TableInfo{} + switch job.Type { + case ActionCreateTable: + return errors.Trace(job.DecodeArgs(a.TableInfo, &a.FKCheck)) + case ActionCreateView: + return errors.Trace(job.DecodeArgs(a.TableInfo, &a.OnExistReplace, &a.OldViewTblID)) + case ActionCreateSequence: + return errors.Trace(job.DecodeArgs(a.TableInfo)) + } + return nil } // GetCreateTableArgs gets the create-table args. func GetCreateTableArgs(job *Job) (*CreateTableArgs, error) { - if job.Version == JobVersion1 { - var ( - tableInfo = &TableInfo{} - onExistReplace bool - oldViewTblID int64 - fkCheck bool - ) - switch job.Type { - case ActionCreateTable: - if err := job.DecodeArgs(tableInfo, &fkCheck); err != nil { - return nil, errors.Trace(err) - } - case ActionCreateView: - if err := job.DecodeArgs(tableInfo, &onExistReplace, &oldViewTblID); err != nil { - return nil, errors.Trace(err) - } - case ActionCreateSequence: - if err := job.DecodeArgs(tableInfo); err != nil { - return nil, errors.Trace(err) - } - } - return &CreateTableArgs{ - TableInfo: tableInfo, - OnExistReplace: onExistReplace, - OldViewTblID: oldViewTblID, - FKCheck: fkCheck, - }, nil - } - return getOrDecodeArgsV2[*CreateTableArgs](job) + return getOrDecodeArgs[*CreateTableArgs](&CreateTableArgs{}, job) } // BatchCreateTableArgs is the arguments for batch create table job. @@ -267,31 +250,32 @@ type BatchCreateTableArgs struct { Tables []*CreateTableArgs `json:"tables,omitempty"` } -func (a *BatchCreateTableArgs) fillJobV1(job *Job) { +func (a *BatchCreateTableArgs) getArgsV1(*Job) []any { infos := make([]*TableInfo, 0, len(a.Tables)) for _, info := range a.Tables { infos = append(infos, info.TableInfo) } - job.Args = []any{infos, a.Tables[0].FKCheck} + return []any{infos, a.Tables[0].FKCheck} +} + +func (a *BatchCreateTableArgs) decodeV1(job *Job) error { + var ( + tableInfos []*TableInfo + fkCheck bool + ) + if err := job.DecodeArgs(&tableInfos, &fkCheck); err != nil { + return errors.Trace(err) + } + a.Tables = make([]*CreateTableArgs, 0, len(tableInfos)) + for _, info := range tableInfos { + a.Tables = append(a.Tables, &CreateTableArgs{TableInfo: info, FKCheck: fkCheck}) + } + return nil } // GetBatchCreateTableArgs gets the batch create-table args. func GetBatchCreateTableArgs(job *Job) (*BatchCreateTableArgs, error) { - if job.Version == JobVersion1 { - var ( - tableInfos []*TableInfo - fkCheck bool - ) - if err := job.DecodeArgs(&tableInfos, &fkCheck); err != nil { - return nil, errors.Trace(err) - } - args := &BatchCreateTableArgs{Tables: make([]*CreateTableArgs, 0, len(tableInfos))} - for _, info := range tableInfos { - args.Tables = append(args.Tables, &CreateTableArgs{TableInfo: info, FKCheck: fkCheck}) - } - return args, nil - } - return getOrDecodeArgsV2[*BatchCreateTableArgs](job) + return getOrDecodeArgs[*BatchCreateTableArgs](&BatchCreateTableArgs{}, job) } // DropTableArgs is the arguments for drop table/view/sequence job. @@ -308,15 +292,16 @@ type DropTableArgs struct { OldRuleIDs []string `json:"old_rule_ids,omitempty"` } -func (a *DropTableArgs) fillJobV1(job *Job) { - // only drop table job has in args. +func (a *DropTableArgs) getArgsV1(job *Job) []any { + // only drop-table job has in args, drop view/sequence job has no args. if job.Type == ActionDropTable { - job.Args = []any{a.Identifiers, a.FKCheck} + return []any{a.Identifiers, a.FKCheck} } + return nil } -func (a *DropTableArgs) fillFinishedJobV1(job *Job) { - job.Args = []any{a.StartKey, a.OldPartitionIDs, a.OldRuleIDs} +func (a *DropTableArgs) getFinishedArgsV1(*Job) []any { + return []any{a.StartKey, a.OldPartitionIDs, a.OldRuleIDs} } func (a *DropTableArgs) decodeV1(job *Job) error { @@ -326,14 +311,7 @@ func (a *DropTableArgs) decodeV1(job *Job) error { // GetDropTableArgs gets the drop-table args. func GetDropTableArgs(job *Job) (*DropTableArgs, error) { - if job.Version == JobVersion1 { - args := &DropTableArgs{} - if err := args.decodeV1(job); err != nil { - return nil, errors.Trace(err) - } - return args, nil - } - return getOrDecodeArgsV2[*DropTableArgs](job) + return getOrDecodeArgs[*DropTableArgs](&DropTableArgs{}, job) } // GetFinishedDropTableArgs gets the drop-table args after the job is finished. @@ -368,72 +346,55 @@ type TruncateTableArgs struct { OldPartIDsWithPolicy []int64 `json:"-"` } -func (a *TruncateTableArgs) fillJobV1(job *Job) { +func (a *TruncateTableArgs) getArgsV1(job *Job) []any { if job.Type == ActionTruncateTable { // Args[0] is the new table ID, args[2] is the ids for table partitions, we // add a placeholder here, they will be filled by job submitter. // the last param is not required for execution, we need it to calculate // number of new IDs to generate. - job.Args = []any{a.NewTableID, a.FKCheck, a.NewPartitionIDs, len(a.OldPartitionIDs)} - } else { - job.Args = []any{a.OldPartitionIDs, a.NewPartitionIDs} + return []any{a.NewTableID, a.FKCheck, a.NewPartitionIDs, len(a.OldPartitionIDs)} } + return []any{a.OldPartitionIDs, a.NewPartitionIDs} } func (a *TruncateTableArgs) decodeV1(job *Job) error { - var err error if job.Type == ActionTruncateTable { - err = job.DecodeArgs(&a.NewTableID, &a.FKCheck, &a.NewPartitionIDs) - } else { - err = job.DecodeArgs(&a.OldPartitionIDs, &a.NewPartitionIDs) + return errors.Trace(job.DecodeArgs(&a.NewTableID, &a.FKCheck, &a.NewPartitionIDs)) } - return err + return errors.Trace(job.DecodeArgs(&a.OldPartitionIDs, &a.NewPartitionIDs)) } -func (a *TruncateTableArgs) fillFinishedJobV1(job *Job) { +func (a *TruncateTableArgs) getFinishedArgsV1(job *Job) []any { if job.Type == ActionTruncateTable { // the first param is the start key of the old table, it's not used anywhere // now, so we fill an empty byte slice here. // we can call tablecodec.EncodeTablePrefix(tableID) to get it. - job.Args = []any{[]byte{}, a.OldPartitionIDs} - } else { - job.Args = []any{a.OldPartitionIDs} + return []any{[]byte{}, a.OldPartitionIDs} } + return []any{a.OldPartitionIDs} } // GetTruncateTableArgs gets the truncate table args. func GetTruncateTableArgs(job *Job) (*TruncateTableArgs, error) { - return getTruncateTableArgs(job, false) + return getOrDecodeArgs[*TruncateTableArgs](&TruncateTableArgs{}, job) } // GetFinishedTruncateTableArgs gets the truncate table args after the job is finished. func GetFinishedTruncateTableArgs(job *Job) (*TruncateTableArgs, error) { - return getTruncateTableArgs(job, true) -} - -func getTruncateTableArgs(job *Job, argsOfFinished bool) (*TruncateTableArgs, error) { if job.Version == JobVersion1 { - if argsOfFinished { - if job.Type == ActionTruncateTable { - var startKey []byte - var oldPartitionIDs []int64 - if err := job.DecodeArgs(&startKey, &oldPartitionIDs); err != nil { - return nil, errors.Trace(err) - } - return &TruncateTableArgs{OldPartitionIDs: oldPartitionIDs}, nil - } + if job.Type == ActionTruncateTable { + var startKey []byte var oldPartitionIDs []int64 - if err := job.DecodeArgs(&oldPartitionIDs); err != nil { + if err := job.DecodeArgs(&startKey, &oldPartitionIDs); err != nil { return nil, errors.Trace(err) } return &TruncateTableArgs{OldPartitionIDs: oldPartitionIDs}, nil } - - args := &TruncateTableArgs{} - if err := args.decodeV1(job); err != nil { + var oldPartitionIDs []int64 + if err := job.DecodeArgs(&oldPartitionIDs); err != nil { return nil, errors.Trace(err) } - return args, nil + return &TruncateTableArgs{OldPartitionIDs: oldPartitionIDs}, nil } return getOrDecodeArgsV2[*TruncateTableArgs](job) @@ -457,20 +418,19 @@ type TablePartitionArgs struct { OldPhysicalTblIDs []int64 `json:"old_physical_tbl_ids,omitempty"` } -func (a *TablePartitionArgs) fillJobV1(job *Job) { +func (a *TablePartitionArgs) getArgsV1(job *Job) []any { if job.Type == ActionAddTablePartition { - job.Args = []any{a.PartInfo} + return []any{a.PartInfo} } else if job.Type == ActionDropTablePartition { - job.Args = []any{a.PartNames} - } else { - job.Args = []any{a.PartNames, a.PartInfo} + return []any{a.PartNames} } + return []any{a.PartNames, a.PartInfo} } -func (a *TablePartitionArgs) fillFinishedJobV1(job *Job) { +func (a *TablePartitionArgs) getFinishedArgsV1(job *Job) []any { intest.Assert(job.Type != ActionAddTablePartition || job.State == JobStateRollbackDone, - "add table partition job should not call fillFinishedJobV1 if not rollback") - job.Args = []any{a.OldPhysicalTblIDs} + "add table partition job should not call getFinishedArgsV1 if not rollback") + return []any{a.OldPhysicalTblIDs} } func (a *TablePartitionArgs) decodeV1(job *Job) error { @@ -504,14 +464,7 @@ func (a *TablePartitionArgs) decodeV1(job *Job) error { // GetTablePartitionArgs gets the table partition args. func GetTablePartitionArgs(job *Job) (*TablePartitionArgs, error) { - if job.Version == JobVersion1 { - args := &TablePartitionArgs{} - if err := args.decodeV1(job); err != nil { - return nil, errors.Trace(err) - } - return args, nil - } - args, err := getOrDecodeArgsV2[*TablePartitionArgs](job) + args, err := getOrDecodeArgs[*TablePartitionArgs](&TablePartitionArgs{}, job) if err != nil { return nil, errors.Trace(err) } @@ -562,24 +515,17 @@ type ExchangeTablePartitionArgs struct { WithValidation bool `json:"with_validation,omitempty"` } -func (a *ExchangeTablePartitionArgs) fillJobV1(job *Job) { - job.Args = []any{a.PartitionID, a.PTSchemaID, a.PTTableID, a.PartitionName, a.WithValidation} +func (a *ExchangeTablePartitionArgs) getArgsV1(*Job) []any { + return []any{a.PartitionID, a.PTSchemaID, a.PTTableID, a.PartitionName, a.WithValidation} } func (a *ExchangeTablePartitionArgs) decodeV1(job *Job) error { - return job.DecodeArgs(&a.PartitionID, &a.PTSchemaID, &a.PTTableID, &a.PartitionName, &a.WithValidation) + return errors.Trace(job.DecodeArgs(&a.PartitionID, &a.PTSchemaID, &a.PTTableID, &a.PartitionName, &a.WithValidation)) } // GetExchangeTablePartitionArgs gets the exchange table partition args. func GetExchangeTablePartitionArgs(job *Job) (*ExchangeTablePartitionArgs, error) { - if job.Version == JobVersion1 { - args := &ExchangeTablePartitionArgs{} - if err := args.decodeV1(job); err != nil { - return nil, errors.Trace(err) - } - return args, nil - } - return getOrDecodeArgsV2[*ExchangeTablePartitionArgs](job) + return getOrDecodeArgs[*ExchangeTablePartitionArgs](&ExchangeTablePartitionArgs{}, job) } // AlterTablePartitionArgs is the arguments for alter table partition job. @@ -592,31 +538,23 @@ type AlterTablePartitionArgs struct { PolicyRefInfo *PolicyRefInfo `json:"policy_ref_info,omitempty"` } -func (a *AlterTablePartitionArgs) fillJobV1(job *Job) { +func (a *AlterTablePartitionArgs) getArgsV1(job *Job) []any { if job.Type == ActionAlterTablePartitionAttributes { - job.Args = []any{a.PartitionID, a.LabelRule} - } else { - job.Args = []any{a.PartitionID, a.PolicyRefInfo} + return []any{a.PartitionID, a.LabelRule} } + return []any{a.PartitionID, a.PolicyRefInfo} } func (a *AlterTablePartitionArgs) decodeV1(job *Job) error { if job.Type == ActionAlterTablePartitionAttributes { - return job.DecodeArgs(&a.PartitionID, &a.LabelRule) + return errors.Trace(job.DecodeArgs(&a.PartitionID, &a.LabelRule)) } - return job.DecodeArgs(&a.PartitionID, &a.PolicyRefInfo) + return errors.Trace(job.DecodeArgs(&a.PartitionID, &a.PolicyRefInfo)) } // GetAlterTablePartitionArgs gets the alter table partition args. func GetAlterTablePartitionArgs(job *Job) (*AlterTablePartitionArgs, error) { - if job.Version == JobVersion1 { - args := &AlterTablePartitionArgs{} - if err := args.decodeV1(job); err != nil { - return nil, errors.Trace(err) - } - return args, nil - } - return getOrDecodeArgsV2[*AlterTablePartitionArgs](job) + return getOrDecodeArgs[*AlterTablePartitionArgs](&AlterTablePartitionArgs{}, job) } // RenameTableArgs is the arguments for rename table DDL job. @@ -633,45 +571,26 @@ type RenameTableArgs struct { TableID int64 `json:"table_id,omitempty"` } -func (rt *RenameTableArgs) fillJobV1(job *Job) { - job.Args = []any{rt.OldSchemaID, rt.NewTableName, rt.OldSchemaName} +func (rt *RenameTableArgs) getArgsV1(*Job) []any { + return []any{rt.OldSchemaID, rt.NewTableName, rt.OldSchemaName} +} + +func (rt *RenameTableArgs) decodeV1(job *Job) error { + return errors.Trace(job.DecodeArgs(&rt.OldSchemaID, &rt.NewTableName, &rt.OldSchemaName)) } // GetRenameTableArgs get the arguments from job. func GetRenameTableArgs(job *Job) (*RenameTableArgs, error) { - var ( - oldSchemaID int64 - oldSchemaName pmodel.CIStr - newTableName pmodel.CIStr - args *RenameTableArgs - err error - ) - - if job.Version == JobVersion1 { - // decode args and cache in args. - err = job.DecodeArgs(&oldSchemaID, &newTableName, &oldSchemaName) - if err != nil { - return nil, errors.Trace(err) - } - args = &RenameTableArgs{ - OldSchemaID: oldSchemaID, - OldSchemaName: oldSchemaName, - NewTableName: newTableName, - } - } else { - // for version V2 - args, err = getOrDecodeArgsV2[*RenameTableArgs](job) - if err != nil { - return nil, errors.Trace(err) - } + args, err := getOrDecodeArgs[*RenameTableArgs](&RenameTableArgs{}, job) + if err != nil { + return nil, errors.Trace(err) } - // NewSchemaID is used for checkAndRenameTables, which is not set for rename table. args.NewSchemaID = job.SchemaID return args, nil } -// UpdateRenameTableArgs updates the rename table args. +// UpdateRenameTableArgs updates the rename-table args. // need to reset the old schema ID to new schema ID. func UpdateRenameTableArgs(job *Job) error { var err error @@ -701,52 +620,38 @@ func UpdateRenameTableArgs(job *Job) error { return nil } -// CheckConstraintArgs is the arguments for both AlterCheckConstraint and DropCheckConstraint job. -type CheckConstraintArgs struct { - ConstraintName pmodel.CIStr `json:"constraint_name,omitempty"` - Enforced bool `json:"enforced,omitempty"` -} - -func (a *CheckConstraintArgs) fillJobV1(job *Job) { - job.Args = []any{a.ConstraintName, a.Enforced} -} - // ResourceGroupArgs is the arguments for resource group job. type ResourceGroupArgs struct { // for DropResourceGroup we only use it to store the name, other fields are invalid. RGInfo *ResourceGroupInfo `json:"rg_info,omitempty"` } -func (a *ResourceGroupArgs) fillJobV1(job *Job) { +func (a *ResourceGroupArgs) getArgsV1(job *Job) []any { if job.Type == ActionCreateResourceGroup { // what's the second parameter for? we keep it for compatibility. - job.Args = []any{a.RGInfo, false} + return []any{a.RGInfo, false} } else if job.Type == ActionAlterResourceGroup { - job.Args = []any{a.RGInfo} + return []any{a.RGInfo} } else if job.Type == ActionDropResourceGroup { // it's not used anywhere. - job.Args = []any{a.RGInfo.Name} + return []any{a.RGInfo.Name} } + return nil +} + +func (a *ResourceGroupArgs) decodeV1(job *Job) error { + a.RGInfo = &ResourceGroupInfo{} + if job.Type == ActionCreateResourceGroup || job.Type == ActionAlterResourceGroup { + return errors.Trace(job.DecodeArgs(a.RGInfo)) + } else if job.Type == ActionDropResourceGroup { + return errors.Trace(job.DecodeArgs(&a.RGInfo.Name)) + } + return nil } // GetResourceGroupArgs gets the resource group args. func GetResourceGroupArgs(job *Job) (*ResourceGroupArgs, error) { - if job.Version == JobVersion1 { - rgInfo := ResourceGroupInfo{} - if job.Type == ActionCreateResourceGroup || job.Type == ActionAlterResourceGroup { - if err := job.DecodeArgs(&rgInfo); err != nil { - return nil, errors.Trace(err) - } - } else if job.Type == ActionDropResourceGroup { - var rgName pmodel.CIStr - if err := job.DecodeArgs(&rgName); err != nil { - return nil, errors.Trace(err) - } - rgInfo.Name = rgName - } - return &ResourceGroupArgs{RGInfo: &rgInfo}, nil - } - return getOrDecodeArgsV2[*ResourceGroupArgs](job) + return getOrDecodeArgs[*ResourceGroupArgs](&ResourceGroupArgs{}, job) } // RebaseAutoIDArgs is the arguments for ActionRebaseAutoID DDL. @@ -756,29 +661,17 @@ type RebaseAutoIDArgs struct { Force bool `json:"force,omitempty"` } -func (a *RebaseAutoIDArgs) fillJobV1(job *Job) { - job.Args = []any{a.NewBase, a.Force} +func (a *RebaseAutoIDArgs) getArgsV1(*Job) []any { + return []any{a.NewBase, a.Force} +} + +func (a *RebaseAutoIDArgs) decodeV1(job *Job) error { + return errors.Trace(job.DecodeArgs(&a.NewBase, &a.Force)) } // GetRebaseAutoIDArgs the args for ActionRebaseAutoID/ActionRebaseAutoRandomBase ddl. func GetRebaseAutoIDArgs(job *Job) (*RebaseAutoIDArgs, error) { - var ( - newBase int64 - force bool - ) - - if job.Version == JobVersion1 { - if err := job.DecodeArgs(&newBase, &force); err != nil { - return nil, errors.Trace(err) - } - return &RebaseAutoIDArgs{ - NewBase: newBase, - Force: force, - }, nil - } - - // for version V2 - return getOrDecodeArgsV2[*RebaseAutoIDArgs](job) + return getOrDecodeArgs[*RebaseAutoIDArgs](&RebaseAutoIDArgs{}, job) } // ModifyTableCommentArgs is the arguments for ActionModifyTableComment ddl. @@ -786,23 +679,17 @@ type ModifyTableCommentArgs struct { Comment string `json:"comment,omitempty"` } -func (a *ModifyTableCommentArgs) fillJobV1(job *Job) { - job.Args = []any{a.Comment} +func (a *ModifyTableCommentArgs) getArgsV1(*Job) []any { + return []any{a.Comment} +} + +func (a *ModifyTableCommentArgs) decodeV1(job *Job) error { + return errors.Trace(job.DecodeArgs(&a.Comment)) } // GetModifyTableCommentArgs gets the args for ActionModifyTableComment. func GetModifyTableCommentArgs(job *Job) (*ModifyTableCommentArgs, error) { - if job.Version == JobVersion1 { - var comment string - if err := job.DecodeArgs(&comment); err != nil { - return nil, errors.Trace(err) - } - return &ModifyTableCommentArgs{ - Comment: comment, - }, nil - } - - return getOrDecodeArgsV2[*ModifyTableCommentArgs](job) + return getOrDecodeArgs[*ModifyTableCommentArgs](&ModifyTableCommentArgs{}, job) } // ModifyTableCharsetAndCollateArgs is the arguments for ActionModifyTableCharsetAndCollate ddl. @@ -812,22 +699,17 @@ type ModifyTableCharsetAndCollateArgs struct { NeedsOverwriteCols bool `json:"needs_overwrite_cols,omitempty"` } -func (a *ModifyTableCharsetAndCollateArgs) fillJobV1(job *Job) { - job.Args = []any{a.ToCharset, a.ToCollate, a.NeedsOverwriteCols} +func (a *ModifyTableCharsetAndCollateArgs) getArgsV1(*Job) []any { + return []any{a.ToCharset, a.ToCollate, a.NeedsOverwriteCols} +} + +func (a *ModifyTableCharsetAndCollateArgs) decodeV1(job *Job) error { + return errors.Trace(job.DecodeArgs(&a.ToCharset, &a.ToCollate, &a.NeedsOverwriteCols)) } // GetModifyTableCharsetAndCollateArgs gets the args for ActionModifyTableCharsetAndCollate ddl. func GetModifyTableCharsetAndCollateArgs(job *Job) (*ModifyTableCharsetAndCollateArgs, error) { - if job.Version == JobVersion1 { - args := &ModifyTableCharsetAndCollateArgs{} - err := job.DecodeArgs(&args.ToCharset, &args.ToCollate, &args.NeedsOverwriteCols) - if err != nil { - return nil, errors.Trace(err) - } - return args, nil - } - - return getOrDecodeArgsV2[*ModifyTableCharsetAndCollateArgs](job) + return getOrDecodeArgs[*ModifyTableCharsetAndCollateArgs](&ModifyTableCharsetAndCollateArgs{}, job) } // AlterIndexVisibilityArgs is the arguments for ActionAlterIndexVisibility ddl. @@ -836,27 +718,17 @@ type AlterIndexVisibilityArgs struct { Invisible bool `json:"invisible,omitempty"` } -func (a *AlterIndexVisibilityArgs) fillJobV1(job *Job) { - job.Args = []any{a.IndexName, a.Invisible} +func (a *AlterIndexVisibilityArgs) getArgsV1(*Job) []any { + return []any{a.IndexName, a.Invisible} +} + +func (a *AlterIndexVisibilityArgs) decodeV1(job *Job) error { + return errors.Trace(job.DecodeArgs(&a.IndexName, &a.Invisible)) } // GetAlterIndexVisibilityArgs gets the args for AlterIndexVisibility ddl. func GetAlterIndexVisibilityArgs(job *Job) (*AlterIndexVisibilityArgs, error) { - if job.Version == JobVersion1 { - var ( - indexName pmodel.CIStr - invisible bool - ) - if err := job.DecodeArgs(&indexName, &invisible); err != nil { - return nil, errors.Trace(err) - } - return &AlterIndexVisibilityArgs{ - IndexName: indexName, - Invisible: invisible, - }, nil - } - - return getOrDecodeArgsV2[*AlterIndexVisibilityArgs](job) + return getOrDecodeArgs[*AlterIndexVisibilityArgs](&AlterIndexVisibilityArgs{}, job) } // AddForeignKeyArgs is the arguments for ActionAddForeignKey ddl. @@ -865,27 +737,17 @@ type AddForeignKeyArgs struct { FkCheck bool `json:"fk_check,omitempty"` } -func (a *AddForeignKeyArgs) fillJobV1(job *Job) { - job.Args = []any{a.FkInfo, a.FkCheck} +func (a *AddForeignKeyArgs) getArgsV1(*Job) []any { + return []any{a.FkInfo, a.FkCheck} +} + +func (a *AddForeignKeyArgs) decodeV1(job *Job) error { + return errors.Trace(job.DecodeArgs(&a.FkInfo, &a.FkCheck)) } // GetAddForeignKeyArgs get the args for AddForeignKey ddl. func GetAddForeignKeyArgs(job *Job) (*AddForeignKeyArgs, error) { - if job.Version == JobVersion1 { - var ( - fkInfo *FKInfo - fkCheck bool - ) - if err := job.DecodeArgs(&fkInfo, &fkCheck); err != nil { - return nil, errors.Trace(err) - } - return &AddForeignKeyArgs{ - FkInfo: fkInfo, - FkCheck: fkCheck, - }, nil - } - - return getOrDecodeArgsV2[*AddForeignKeyArgs](job) + return getOrDecodeArgs[*AddForeignKeyArgs](&AddForeignKeyArgs{}, job) } // DropForeignKeyArgs is the arguments for DropForeignKey ddl. @@ -893,21 +755,17 @@ type DropForeignKeyArgs struct { FkName pmodel.CIStr `json:"fk_name,omitempty"` } -func (a *DropForeignKeyArgs) fillJobV1(job *Job) { - job.Args = []any{a.FkName} +func (a *DropForeignKeyArgs) getArgsV1(*Job) []any { + return []any{a.FkName} +} + +func (a *DropForeignKeyArgs) decodeV1(job *Job) error { + return errors.Trace(job.DecodeArgs(&a.FkName)) } // GetDropForeignKeyArgs gets the args for DropForeignKey ddl. func GetDropForeignKeyArgs(job *Job) (*DropForeignKeyArgs, error) { - if job.Version == JobVersion1 { - var fkName pmodel.CIStr - if err := job.DecodeArgs(&fkName); err != nil { - return nil, errors.Trace(err) - } - return &DropForeignKeyArgs{FkName: fkName}, nil - } - - return getOrDecodeArgsV2[*DropForeignKeyArgs](job) + return getOrDecodeArgs[*DropForeignKeyArgs](&DropForeignKeyArgs{}, job) } // TableColumnArgs is the arguments for dropping column ddl or Adding column ddl. @@ -925,12 +783,23 @@ type TableColumnArgs struct { PartitionIDs []int64 `json:"partition_ids,omitempty"` } -func (a *TableColumnArgs) fillJobV1(job *Job) { +func (a *TableColumnArgs) getArgsV1(job *Job) []any { if job.Type == ActionDropColumn { - job.Args = []any{a.Col.Name, a.IgnoreExistenceErr, a.IndexIDs, a.PartitionIDs} - } else { - job.Args = []any{a.Col, a.Pos, a.Offset, a.IgnoreExistenceErr} + return []any{a.Col.Name, a.IgnoreExistenceErr, a.IndexIDs, a.PartitionIDs} + } + return []any{a.Col, a.Pos, a.Offset, a.IgnoreExistenceErr} +} + +func (a *TableColumnArgs) decodeV1(job *Job) error { + a.Col = &ColumnInfo{} + a.Pos = &ast.ColumnPosition{} + + // when rollbacking add-columm, it's arguments is same as drop-column + if job.Type == ActionDropColumn || job.State == JobStateRollingback { + return errors.Trace(job.DecodeArgs(&a.Col.Name, &a.IgnoreExistenceErr, &a.IndexIDs, &a.PartitionIDs)) } + // for add column ddl. + return errors.Trace(job.DecodeArgs(a.Col, a.Pos, &a.Offset, &a.IgnoreExistenceErr)) } // FillRollBackArgsForAddColumn fills the args for rollback add column ddl. @@ -946,27 +815,7 @@ func FillRollBackArgsForAddColumn(job *Job, args *TableColumnArgs) { // GetTableColumnArgs gets the args for dropping column ddl or Adding column ddl. func GetTableColumnArgs(job *Job) (*TableColumnArgs, error) { - if job.Version == JobVersion1 { - args := &TableColumnArgs{ - Col: &ColumnInfo{}, - Pos: &ast.ColumnPosition{}, - } - - // when rollbacking add-columm, it's arguments is same as drop-column - if job.Type == ActionDropColumn || job.State == JobStateRollingback { - err := job.DecodeArgs(&args.Col.Name, &args.IgnoreExistenceErr, &args.IndexIDs, &args.PartitionIDs) - if err != nil { - return nil, errors.Trace(err) - } - } else { - // for add column ddl. - if err := job.DecodeArgs(args.Col, args.Pos, &args.Offset, &args.IgnoreExistenceErr); err != nil { - return nil, errors.Trace(err) - } - } - return args, nil - } - return getOrDecodeArgsV2[*TableColumnArgs](job) + return getOrDecodeArgs[*TableColumnArgs](&TableColumnArgs{}, job) } // RenameTablesArgs is the arguments for rename tables job. @@ -974,7 +823,7 @@ type RenameTablesArgs struct { RenameTableInfos []*RenameTableArgs `json:"rename_table_infos,omitempty"` } -func (a *RenameTablesArgs) fillJobV1(job *Job) { +func (a *RenameTablesArgs) getArgsV1(*Job) []any { n := len(a.RenameTableInfos) oldSchemaIDs := make([]int64, n) oldSchemaNames := make([]pmodel.CIStr, n) @@ -993,7 +842,29 @@ func (a *RenameTablesArgs) fillJobV1(job *Job) { } // To make it compatible with previous create metas - job.Args = []any{oldSchemaIDs, newSchemaIDs, newTableNames, tableIDs, oldSchemaNames, oldTableNames} + return []any{oldSchemaIDs, newSchemaIDs, newTableNames, tableIDs, oldSchemaNames, oldTableNames} +} + +func (a *RenameTablesArgs) decodeV1(job *Job) error { + var ( + oldSchemaIDs []int64 + oldSchemaNames []pmodel.CIStr + oldTableNames []pmodel.CIStr + newSchemaIDs []int64 + newTableNames []pmodel.CIStr + tableIDs []int64 + ) + if err := job.DecodeArgs( + &oldSchemaIDs, &newSchemaIDs, &newTableNames, + &tableIDs, &oldSchemaNames, &oldTableNames); err != nil { + return errors.Trace(err) + } + + a.RenameTableInfos = GetRenameTablesArgsFromV1( + oldSchemaIDs, oldSchemaNames, oldTableNames, + newSchemaIDs, newTableNames, tableIDs, + ) + return nil } // GetRenameTablesArgsFromV1 get v2 args from v1 @@ -1004,7 +875,7 @@ func GetRenameTablesArgsFromV1( newSchemaIDs []int64, newTableNames []pmodel.CIStr, tableIDs []int64, -) *RenameTablesArgs { +) []*RenameTableArgs { infos := make([]*RenameTableArgs, 0, len(oldSchemaIDs)) for i, oldSchemaID := range oldSchemaIDs { infos = append(infos, &RenameTableArgs{ @@ -1017,33 +888,12 @@ func GetRenameTablesArgsFromV1( }) } - return &RenameTablesArgs{ - RenameTableInfos: infos, - } + return infos } -// GetRenameTablesArgs gets the rename tables args. +// GetRenameTablesArgs gets the rename-tables args. func GetRenameTablesArgs(job *Job) (*RenameTablesArgs, error) { - if job.Version == JobVersion1 { - var ( - oldSchemaIDs []int64 - oldSchemaNames []pmodel.CIStr - oldTableNames []pmodel.CIStr - newSchemaIDs []int64 - newTableNames []pmodel.CIStr - tableIDs []int64 - ) - if err := job.DecodeArgs( - &oldSchemaIDs, &newSchemaIDs, &newTableNames, - &tableIDs, &oldSchemaNames, &oldTableNames); err != nil { - return nil, errors.Trace(err) - } - - return GetRenameTablesArgsFromV1( - oldSchemaIDs, oldSchemaNames, oldTableNames, - newSchemaIDs, newTableNames, tableIDs), nil - } - return getOrDecodeArgsV2[*RenameTablesArgs](job) + return getOrDecodeArgs[*RenameTablesArgs](&RenameTablesArgs{}, job) } // AlterSequenceArgs is the arguments for alter sequence ddl job. @@ -1052,27 +902,17 @@ type AlterSequenceArgs struct { SeqOptions []*ast.SequenceOption `json:"seq_options,omitempty"` } -func (a *AlterSequenceArgs) fillJobV1(job *Job) { - job.Args = []any{a.Ident, a.SeqOptions} +func (a *AlterSequenceArgs) getArgsV1(*Job) []any { + return []any{a.Ident, a.SeqOptions} +} + +func (a *AlterSequenceArgs) decodeV1(job *Job) error { + return errors.Trace(job.DecodeArgs(&a.Ident, &a.SeqOptions)) } // GetAlterSequenceArgs gets the args for alter Sequence ddl job. func GetAlterSequenceArgs(job *Job) (*AlterSequenceArgs, error) { - if job.Version == JobVersion1 { - var ( - ident ast.Ident - seqOptions []*ast.SequenceOption - ) - if err := job.DecodeArgs(&ident, &seqOptions); err != nil { - return nil, errors.Trace(err) - } - return &AlterSequenceArgs{ - Ident: ident, - SeqOptions: seqOptions, - }, nil - } - - return getOrDecodeArgsV2[*AlterSequenceArgs](job) + return getOrDecodeArgs[*AlterSequenceArgs](&AlterSequenceArgs{}, job) } // ModifyTableAutoIDCacheArgs is the arguments for Modify Table AutoID Cache ddl job. @@ -1080,23 +920,17 @@ type ModifyTableAutoIDCacheArgs struct { NewCache int64 `json:"new_cache,omitempty"` } -func (a *ModifyTableAutoIDCacheArgs) fillJobV1(job *Job) { - job.Args = []any{a.NewCache} +func (a *ModifyTableAutoIDCacheArgs) getArgsV1(*Job) []any { + return []any{a.NewCache} +} + +func (a *ModifyTableAutoIDCacheArgs) decodeV1(job *Job) error { + return errors.Trace(job.DecodeArgs(&a.NewCache)) } // GetModifyTableAutoIDCacheArgs gets the args for modify table autoID cache ddl job. func GetModifyTableAutoIDCacheArgs(job *Job) (*ModifyTableAutoIDCacheArgs, error) { - if job.Version == JobVersion1 { - var newCache int64 - if err := job.DecodeArgs(&newCache); err != nil { - return nil, errors.Trace(err) - } - return &ModifyTableAutoIDCacheArgs{ - NewCache: newCache, - }, nil - } - - return getOrDecodeArgsV2[*ModifyTableAutoIDCacheArgs](job) + return getOrDecodeArgs[*ModifyTableAutoIDCacheArgs](&ModifyTableAutoIDCacheArgs{}, job) } // ShardRowIDArgs is the arguments for shard row ID ddl job. @@ -1104,23 +938,17 @@ type ShardRowIDArgs struct { ShardRowIDBits uint64 `json:"shard_row_id_bits,omitempty"` } -func (a *ShardRowIDArgs) fillJobV1(job *Job) { - job.Args = []any{a.ShardRowIDBits} +func (a *ShardRowIDArgs) getArgsV1(*Job) []any { + return []any{a.ShardRowIDBits} +} + +func (a *ShardRowIDArgs) decodeV1(job *Job) error { + return errors.Trace(job.DecodeArgs(&a.ShardRowIDBits)) } // GetShardRowIDArgs gets the args for shard row ID ddl job. func GetShardRowIDArgs(job *Job) (*ShardRowIDArgs, error) { - if job.Version == JobVersion1 { - var val uint64 - if err := job.DecodeArgs(&val); err != nil { - return nil, errors.Trace(err) - } - return &ShardRowIDArgs{ - ShardRowIDBits: val, - }, nil - } - - return getOrDecodeArgsV2[*ShardRowIDArgs](job) + return getOrDecodeArgs[*ShardRowIDArgs](&ShardRowIDArgs{}, job) } // AlterTTLInfoArgs is the arguments for alter ttl info job. @@ -1130,65 +958,55 @@ type AlterTTLInfoArgs struct { TTLCronJobSchedule *string `json:"ttl_cron_job_schedule,omitempty"` } -func (a *AlterTTLInfoArgs) fillJobV1(job *Job) { - job.Args = []any{a.TTLInfo, a.TTLEnable, a.TTLCronJobSchedule} +func (a *AlterTTLInfoArgs) getArgsV1(*Job) []any { + return []any{a.TTLInfo, a.TTLEnable, a.TTLCronJobSchedule} +} + +func (a *AlterTTLInfoArgs) decodeV1(job *Job) error { + return errors.Trace(job.DecodeArgs(&a.TTLInfo, &a.TTLEnable, &a.TTLCronJobSchedule)) } // GetAlterTTLInfoArgs gets the args for alter ttl info job. func GetAlterTTLInfoArgs(job *Job) (*AlterTTLInfoArgs, error) { - if job.Version == JobVersion1 { - args := &AlterTTLInfoArgs{} - if err := job.DecodeArgs(&args.TTLInfo, &args.TTLEnable, &args.TTLCronJobSchedule); err != nil { - return nil, errors.Trace(err) - } - return args, nil - } + return getOrDecodeArgs[*AlterTTLInfoArgs](&AlterTTLInfoArgs{}, job) +} - return getOrDecodeArgsV2[*AlterTTLInfoArgs](job) +// CheckConstraintArgs is the arguments for both AlterCheckConstraint and DropCheckConstraint job. +type CheckConstraintArgs struct { + ConstraintName pmodel.CIStr `json:"constraint_name,omitempty"` + Enforced bool `json:"enforced,omitempty"` +} + +func (a *CheckConstraintArgs) getArgsV1(*Job) []any { + return []any{a.ConstraintName, a.Enforced} +} + +func (a *CheckConstraintArgs) decodeV1(job *Job) error { + return errors.Trace(job.DecodeArgs(&a.ConstraintName, &a.Enforced)) } // GetCheckConstraintArgs gets the AlterCheckConstraint args. func GetCheckConstraintArgs(job *Job) (*CheckConstraintArgs, error) { - if job.Version == JobVersion1 { - var ( - constraintName pmodel.CIStr - enforced bool - ) - err := job.DecodeArgs(&constraintName, &enforced) - if err != nil { - return nil, errors.Trace(err) - } - return &CheckConstraintArgs{ - ConstraintName: constraintName, - Enforced: enforced, - }, nil - } - - return getOrDecodeArgsV2[*CheckConstraintArgs](job) + return getOrDecodeArgs[*CheckConstraintArgs](&CheckConstraintArgs{}, job) } -// AddCheckConstraintArgs is the arguemnt for add check constraint +// AddCheckConstraintArgs is the args for add check constraint type AddCheckConstraintArgs struct { Constraint *ConstraintInfo `json:"constraint_info"` } -func (a *AddCheckConstraintArgs) fillJobV1(job *Job) { - job.Args = []any{a.Constraint} +func (a *AddCheckConstraintArgs) getArgsV1(*Job) []any { + return []any{a.Constraint} +} + +func (a *AddCheckConstraintArgs) decodeV1(job *Job) error { + a.Constraint = &ConstraintInfo{} + return errors.Trace(job.DecodeArgs(&a.Constraint)) } // GetAddCheckConstraintArgs gets the AddCheckConstraint args. func GetAddCheckConstraintArgs(job *Job) (*AddCheckConstraintArgs, error) { - if job.Version == JobVersion1 { - var constraintInfo ConstraintInfo - err := job.DecodeArgs(&constraintInfo) - if err != nil { - return nil, errors.Trace(err) - } - return &AddCheckConstraintArgs{ - Constraint: &constraintInfo, - }, nil - } - return getOrDecodeArgsV2[*AddCheckConstraintArgs](job) + return getOrDecodeArgs[*AddCheckConstraintArgs](&AddCheckConstraintArgs{}, job) } // AlterTablePlacementArgs is the arguments for alter table placements ddl job. @@ -1196,24 +1014,18 @@ type AlterTablePlacementArgs struct { PlacementPolicyRef *PolicyRefInfo `json:"placement_policy_ref,omitempty"` } -func (a *AlterTablePlacementArgs) fillJobV1(job *Job) { - job.Args = []any{a.PlacementPolicyRef} +func (a *AlterTablePlacementArgs) getArgsV1(*Job) []any { + return []any{a.PlacementPolicyRef} +} + +func (a *AlterTablePlacementArgs) decodeV1(job *Job) error { + // when the target policy is 'default', policy info is nil + return errors.Trace(job.DecodeArgs(&a.PlacementPolicyRef)) } // GetAlterTablePlacementArgs gets the args for alter table placements ddl job. func GetAlterTablePlacementArgs(job *Job) (*AlterTablePlacementArgs, error) { - if job.Version == JobVersion1 { - // when the target policy is 'default', policy info is nil - var placementPolicyRef *PolicyRefInfo - if err := job.DecodeArgs(&placementPolicyRef); err != nil { - return nil, errors.Trace(err) - } - return &AlterTablePlacementArgs{ - PlacementPolicyRef: placementPolicyRef, - }, nil - } - - return getOrDecodeArgsV2[*AlterTablePlacementArgs](job) + return getOrDecodeArgs[*AlterTablePlacementArgs](&AlterTablePlacementArgs{}, job) } // SetTiFlashReplicaArgs is the arguments for setting TiFlash replica ddl. @@ -1221,21 +1033,17 @@ type SetTiFlashReplicaArgs struct { TiflashReplica ast.TiFlashReplicaSpec `json:"tiflash_replica,omitempty"` } -func (a *SetTiFlashReplicaArgs) fillJobV1(job *Job) { - job.Args = []any{a.TiflashReplica} +func (a *SetTiFlashReplicaArgs) getArgsV1(*Job) []any { + return []any{a.TiflashReplica} +} + +func (a *SetTiFlashReplicaArgs) decodeV1(job *Job) error { + return errors.Trace(job.DecodeArgs(&a.TiflashReplica)) } // GetSetTiFlashReplicaArgs gets the args for setting TiFlash replica ddl. func GetSetTiFlashReplicaArgs(job *Job) (*SetTiFlashReplicaArgs, error) { - if job.Version == JobVersion1 { - tiflashReplica := ast.TiFlashReplicaSpec{} - if err := job.DecodeArgs(&tiflashReplica); err != nil { - return nil, errors.Trace(err) - } - return &SetTiFlashReplicaArgs{TiflashReplica: tiflashReplica}, nil - } - - return getOrDecodeArgsV2[*SetTiFlashReplicaArgs](job) + return getOrDecodeArgs[*SetTiFlashReplicaArgs](&SetTiFlashReplicaArgs{}, job) } // UpdateTiFlashReplicaStatusArgs is the arguments for updating TiFlash replica status ddl. @@ -1244,27 +1052,17 @@ type UpdateTiFlashReplicaStatusArgs struct { PhysicalID int64 `json:"physical_id,omitempty"` } -func (a *UpdateTiFlashReplicaStatusArgs) fillJobV1(job *Job) { - job.Args = []any{a.Available, a.PhysicalID} +func (a *UpdateTiFlashReplicaStatusArgs) getArgsV1(*Job) []any { + return []any{a.Available, a.PhysicalID} +} + +func (a *UpdateTiFlashReplicaStatusArgs) decodeV1(job *Job) error { + return errors.Trace(job.DecodeArgs(&a.Available, &a.PhysicalID)) } // GetUpdateTiFlashReplicaStatusArgs gets the args for updating TiFlash replica status ddl. func GetUpdateTiFlashReplicaStatusArgs(job *Job) (*UpdateTiFlashReplicaStatusArgs, error) { - if job.Version == JobVersion1 { - var ( - available bool - physicalID int64 - ) - if err := job.DecodeArgs(&available, &physicalID); err != nil { - return nil, errors.Trace(err) - } - return &UpdateTiFlashReplicaStatusArgs{ - Available: available, - PhysicalID: physicalID, - }, nil - } - - return getOrDecodeArgsV2[*UpdateTiFlashReplicaStatusArgs](job) + return getOrDecodeArgs[*UpdateTiFlashReplicaStatusArgs](&UpdateTiFlashReplicaStatusArgs{}, job) } // LockTablesArgs is the argument for LockTables. @@ -1277,47 +1075,54 @@ type LockTablesArgs struct { IsCleanup bool `json:"is_cleanup:omitempty"` } -func (a *LockTablesArgs) fillJobV1(job *Job) { - job.Args = []any{a} +func (a *LockTablesArgs) getArgsV1(*Job) []any { + return []any{a} +} + +func (a *LockTablesArgs) decodeV1(job *Job) error { + return errors.Trace(job.DecodeArgs(a)) } // GetLockTablesArgs get the LockTablesArgs argument. func GetLockTablesArgs(job *Job) (*LockTablesArgs, error) { - var args *LockTablesArgs - var err error - - if job.Version == JobVersion1 { - err = job.DecodeArgs(&args) - } else { - args, err = getOrDecodeArgsV2[*LockTablesArgs](job) - } - - if err != nil { - return nil, errors.Trace(err) - } - return args, nil + return getOrDecodeArgs[*LockTablesArgs](&LockTablesArgs{}, job) } // RepairTableArgs is the argument for repair table type RepairTableArgs struct { - *TableInfo `json:"table_info"` + TableInfo *TableInfo `json:"table_info"` +} + +func (a *RepairTableArgs) getArgsV1(*Job) []any { + return []any{a.TableInfo} } -func (a *RepairTableArgs) fillJobV1(job *Job) { - job.Args = []any{a.TableInfo} +func (a *RepairTableArgs) decodeV1(job *Job) error { + return errors.Trace(job.DecodeArgs(&a.TableInfo)) } // GetRepairTableArgs get the repair table args. func GetRepairTableArgs(job *Job) (*RepairTableArgs, error) { - if job.Version == JobVersion1 { - var tblInfo *TableInfo - if err := job.DecodeArgs(&tblInfo); err != nil { - return nil, errors.Trace(err) - } - return &RepairTableArgs{tblInfo}, nil - } + return getOrDecodeArgs[*RepairTableArgs](&RepairTableArgs{}, job) +} + +// AlterTableAttributesArgs is the argument for alter table attributes +type AlterTableAttributesArgs struct { + LabelRule *pdhttp.LabelRule `json:"label_rule,omitempty"` +} - return getOrDecodeArgsV2[*RepairTableArgs](job) +func (a *AlterTableAttributesArgs) getArgsV1(*Job) []any { + return []any{a.LabelRule} +} + +func (a *AlterTableAttributesArgs) decodeV1(job *Job) error { + a.LabelRule = &pdhttp.LabelRule{} + return errors.Trace(job.DecodeArgs(a.LabelRule)) +} + +// GetAlterTableAttributesArgs get alter table attribute args from job. +func GetAlterTableAttributesArgs(job *Job) (*AlterTableAttributesArgs, error) { + return getOrDecodeArgs[*AlterTableAttributesArgs](&AlterTableAttributesArgs{}, job) } // RecoverArgs is the argument for recover table/schema. @@ -1326,36 +1131,34 @@ type RecoverArgs struct { CheckFlag int64 `json:"check_flag,omitempty"` } -func (a *RecoverArgs) fillJobV1(job *Job) { +func (a *RecoverArgs) getArgsV1(job *Job) []any { if job.Type == ActionRecoverTable { - job.Args = []any{a.RecoverTableInfos()[0], a.CheckFlag} - } else { - job.Args = []any{a.RecoverInfo, a.CheckFlag} + return []any{a.RecoverTableInfos()[0], a.CheckFlag} } + return []any{a.RecoverInfo, a.CheckFlag} } -// AlterTableAttributesArgs is the argument for alter table attributes -type AlterTableAttributesArgs struct { - LabelRule *pdhttp.LabelRule `json:"label_rule,omitempty"` -} - -func (a *AlterTableAttributesArgs) fillJobV1(job *Job) { - job.Args = []any{a.LabelRule} -} - -// GetAlterTableAttributesArgs get alter table attribute args from job. -func GetAlterTableAttributesArgs(job *Job) (*AlterTableAttributesArgs, error) { - if job.Version == JobVersion1 { - labelRule := &pdhttp.LabelRule{} - if err := job.DecodeArgs(labelRule); err != nil { - return nil, errors.Trace(err) +func (a *RecoverArgs) decodeV1(job *Job) error { + var ( + recoverTableInfo *RecoverTableInfo + recoverSchemaInfo = &RecoverSchemaInfo{} + recoverCheckFlag int64 + ) + if job.Type == ActionRecoverTable { + err := job.DecodeArgs(&recoverTableInfo, &recoverCheckFlag) + if err != nil { + return errors.Trace(err) + } + recoverSchemaInfo.RecoverTableInfos = []*RecoverTableInfo{recoverTableInfo} + } else { + err := job.DecodeArgs(recoverSchemaInfo, &recoverCheckFlag) + if err != nil { + return errors.Trace(err) } - return &AlterTableAttributesArgs{ - LabelRule: labelRule, - }, nil } - - return getOrDecodeArgsV2[*AlterTableAttributesArgs](job) + a.RecoverInfo = recoverSchemaInfo + a.CheckFlag = recoverCheckFlag + return nil } // RecoverTableInfos get all the recover infos. @@ -1365,33 +1168,7 @@ func (a *RecoverArgs) RecoverTableInfos() []*RecoverTableInfo { // GetRecoverArgs get the recover table/schema args. func GetRecoverArgs(job *Job) (*RecoverArgs, error) { - if job.Version == JobVersion1 { - var ( - recoverTableInfo *RecoverTableInfo - recoverSchemaInfo = &RecoverSchemaInfo{} - recoverCheckFlag int64 - ) - - if job.Type == ActionRecoverTable { - err := job.DecodeArgs(&recoverTableInfo, &recoverCheckFlag) - if err != nil { - return nil, errors.Trace(err) - } - recoverSchemaInfo.RecoverTableInfos = []*RecoverTableInfo{recoverTableInfo} - } else { - err := job.DecodeArgs(recoverSchemaInfo, &recoverCheckFlag) - if err != nil { - return nil, errors.Trace(err) - } - } - - return &RecoverArgs{ - RecoverInfo: recoverSchemaInfo, - CheckFlag: recoverCheckFlag, - }, nil - } - - return getOrDecodeArgsV2[*RecoverArgs](job) + return getOrDecodeArgs[*RecoverArgs](&RecoverArgs{}, job) } // PlacementPolicyArgs is the argument for create/alter/drop placement policy @@ -1404,40 +1181,29 @@ type PlacementPolicyArgs struct { PolicyID int64 `json:"policy_id"` } -func (a *PlacementPolicyArgs) fillJobV1(job *Job) { +func (a *PlacementPolicyArgs) getArgsV1(job *Job) []any { if job.Type == ActionCreatePlacementPolicy { - job.Args = []any{a.Policy, a.ReplaceOnExist} + return []any{a.Policy, a.ReplaceOnExist} } else if job.Type == ActionAlterPlacementPolicy { - job.Args = []any{a.Policy} - } else { - intest.Assert(job.Type == ActionDropPlacementPolicy, "Invalid job type for PlacementPolicyArgs") - job.Args = []any{a.PolicyName} + return []any{a.Policy} } + return []any{a.PolicyName} } -// GetPlacementPolicyArgs gets the placement policy args. -func GetPlacementPolicyArgs(job *Job) (*PlacementPolicyArgs, error) { - if job.Version == JobVersion1 { - args := &PlacementPolicyArgs{PolicyID: job.SchemaID} - var err error +func (a *PlacementPolicyArgs) decodeV1(job *Job) error { + a.PolicyID = job.SchemaID - if job.Type == ActionCreatePlacementPolicy { - err = job.DecodeArgs(&args.Policy, &args.ReplaceOnExist) - } else if job.Type == ActionAlterPlacementPolicy { - err = job.DecodeArgs(&args.Policy) - } else { - intest.Assert(job.Type == ActionDropPlacementPolicy, "Invalid job type for PlacementPolicyArgs") - err = job.DecodeArgs(&args.PolicyName) - } - - if err != nil { - return nil, errors.Trace(err) - } - - return args, err + if job.Type == ActionCreatePlacementPolicy { + return errors.Trace(job.DecodeArgs(&a.Policy, &a.ReplaceOnExist)) + } else if job.Type == ActionAlterPlacementPolicy { + return errors.Trace(job.DecodeArgs(&a.Policy)) } + return errors.Trace(job.DecodeArgs(&a.PolicyName)) +} - return getOrDecodeArgsV2[*PlacementPolicyArgs](job) +// GetPlacementPolicyArgs gets the placement policy args. +func GetPlacementPolicyArgs(job *Job) (*PlacementPolicyArgs, error) { + return getOrDecodeArgs[*PlacementPolicyArgs](&PlacementPolicyArgs{}, job) } // SetDefaultValueArgs is the argument for setting default value ddl. @@ -1445,22 +1211,18 @@ type SetDefaultValueArgs struct { Col *ColumnInfo `json:"column_info,omitempty"` } -func (a *SetDefaultValueArgs) fillJobV1(job *Job) { - job.Args = []any{a.Col} +func (a *SetDefaultValueArgs) getArgsV1(*Job) []any { + return []any{a.Col} +} + +func (a *SetDefaultValueArgs) decodeV1(job *Job) error { + a.Col = &ColumnInfo{} + return errors.Trace(job.DecodeArgs(a.Col)) } // GetSetDefaultValueArgs get the args for setting default value ddl. func GetSetDefaultValueArgs(job *Job) (*SetDefaultValueArgs, error) { - if job.Version == JobVersion1 { - col := &ColumnInfo{} - err := job.DecodeArgs(col) - if err != nil { - return nil, errors.Trace(err) - } - return &SetDefaultValueArgs{Col: col}, nil - } - - return getOrDecodeArgsV2[*SetDefaultValueArgs](job) + return getOrDecodeArgs[*SetDefaultValueArgs](&SetDefaultValueArgs{}, job) } // KeyRange is copied from kv.KeyRange to avoid cycle import. @@ -1484,7 +1246,7 @@ type FlashbackClusterArgs struct { FlashbackKeyRanges []KeyRange `json:"key_ranges,omitempty"` } -func (a *FlashbackClusterArgs) fillJobV1(job *Job) { +func (a *FlashbackClusterArgs) getArgsV1(*Job) []any { enableAutoAnalyze := "ON" superReadOnly := "ON" enableTTLJob := "ON" @@ -1498,37 +1260,38 @@ func (a *FlashbackClusterArgs) fillJobV1(job *Job) { enableTTLJob = "OFF" } - job.Args = []any{ - a.FlashbackTS, a.PDScheduleValue, a.EnableGC, enableAutoAnalyze, superReadOnly, - a.LockedRegionCnt, a.StartTS, a.CommitTS, enableTTLJob, a.FlashbackKeyRanges, + return []any{ + a.FlashbackTS, a.PDScheduleValue, a.EnableGC, + enableAutoAnalyze, superReadOnly, a.LockedRegionCnt, + a.StartTS, a.CommitTS, enableTTLJob, a.FlashbackKeyRanges, } } -// GetFlashbackClusterArgs get the flashback cluster argument from job. -func GetFlashbackClusterArgs(job *Job) (*FlashbackClusterArgs, error) { - if job.Version == JobVersion1 { - args := &FlashbackClusterArgs{} - var autoAnalyzeValue, readOnlyValue, ttlJobEnableValue string - - if err := job.DecodeArgs( - &args.FlashbackTS, &args.PDScheduleValue, &args.EnableGC, - &autoAnalyzeValue, &readOnlyValue, &args.LockedRegionCnt, - &args.StartTS, &args.CommitTS, &ttlJobEnableValue, &args.FlashbackKeyRanges); err != nil { - return nil, errors.Trace(err) - } +func (a *FlashbackClusterArgs) decodeV1(job *Job) error { + var autoAnalyzeValue, readOnlyValue, ttlJobEnableValue string - if autoAnalyzeValue == "ON" { - args.EnableAutoAnalyze = true - } - if readOnlyValue == "ON" { - args.SuperReadOnly = true - } - if ttlJobEnableValue == "ON" { - args.EnableTTLJob = true - } + if err := job.DecodeArgs( + &a.FlashbackTS, &a.PDScheduleValue, &a.EnableGC, + &autoAnalyzeValue, &readOnlyValue, &a.LockedRegionCnt, + &a.StartTS, &a.CommitTS, &ttlJobEnableValue, &a.FlashbackKeyRanges, + ); err != nil { + return errors.Trace(err) + } - return args, nil + if autoAnalyzeValue == "ON" { + a.EnableAutoAnalyze = true + } + if readOnlyValue == "ON" { + a.SuperReadOnly = true } + if ttlJobEnableValue == "ON" { + a.EnableTTLJob = true + } + + return nil +} - return getOrDecodeArgsV2[*FlashbackClusterArgs](job) +// GetFlashbackClusterArgs get the flashback cluster argument from job. +func GetFlashbackClusterArgs(job *Job) (*FlashbackClusterArgs, error) { + return getOrDecodeArgs[*FlashbackClusterArgs](&FlashbackClusterArgs{}, job) } diff --git a/pkg/meta/model/job_args_test.go b/pkg/meta/model/job_args_test.go index dfd8a9820e4b9..1a718cb4a81fd 100644 --- a/pkg/meta/model/job_args_test.go +++ b/pkg/meta/model/job_args_test.go @@ -497,8 +497,12 @@ func TestUpdateRenameTableArgs(t *testing.T) { func TestGetRenameTablesArgs(t *testing.T) { inArgs := &RenameTablesArgs{ RenameTableInfos: []*RenameTableArgs{ - {1, model.CIStr{O: "db1", L: "db1"}, model.CIStr{O: "tb3", L: "tb3"}, model.CIStr{O: "tb1", L: "tb1"}, 3, 100}, - {2, model.CIStr{O: "db2", L: "db2"}, model.CIStr{O: "tb2", L: "tb2"}, model.CIStr{O: "tb4", L: "tb4"}, 3, 101}, + {OldSchemaID: 1, OldSchemaName: model.CIStr{O: "db1", L: "db1"}, + NewTableName: model.CIStr{O: "tb3", L: "tb3"}, OldTableName: model.CIStr{O: "tb1", L: "tb1"}, + NewSchemaID: 3, TableID: 100}, + {OldSchemaID: 2, OldSchemaName: model.CIStr{O: "db2", L: "db2"}, + NewTableName: model.CIStr{O: "tb2", L: "tb2"}, OldTableName: model.CIStr{O: "tb4", L: "tb4"}, + NewSchemaID: 3, TableID: 101}, }, } for _, v := range []JobVersion{JobVersion1, JobVersion2} {