diff --git a/pkg/ddl/db_integration_test.go b/pkg/ddl/db_integration_test.go index ac5578c0f231a..057f7985216e4 100644 --- a/pkg/ddl/db_integration_test.go +++ b/pkg/ddl/db_integration_test.go @@ -3069,11 +3069,11 @@ func TestIssue52680(t *testing.T) { testSteps := []struct { sql string - expect meta.AutoIDGroup + expect model.AutoIDGroup }{ - {sql: "", expect: meta.AutoIDGroup{RowID: 0, IncrementID: 4000, RandomID: 0}}, - {sql: "drop table issue52680", expect: meta.AutoIDGroup{RowID: 0, IncrementID: 0, RandomID: 0}}, - {sql: "recover table issue52680", expect: meta.AutoIDGroup{RowID: 0, IncrementID: 4000, RandomID: 0}}, + {sql: "", expect: model.AutoIDGroup{RowID: 0, IncrementID: 4000, RandomID: 0}}, + {sql: "drop table issue52680", expect: model.AutoIDGroup{RowID: 0, IncrementID: 0, RandomID: 0}}, + {sql: "recover table issue52680", expect: model.AutoIDGroup{RowID: 0, IncrementID: 4000, RandomID: 0}}, } for _, step := range testSteps { if step.sql != "" { diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index b03b2a0c0c9cc..b6a5134f8f447 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -51,7 +51,6 @@ import ( "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/owner" - pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/sessionctx" @@ -78,9 +77,6 @@ const ( reorgWorkerCnt = 10 generalWorkerCnt = 10 - - // checkFlagIndexInJobArgs is the recoverCheckFlag index used in RecoverTable/RecoverSchema job arg list. - checkFlagIndexInJobArgs = 1 ) const ( @@ -1033,16 +1029,16 @@ func (d *ddl) cleanDeadTableLock(unlockTables []model.TableLockTpInfo, se model. if len(unlockTables) == 0 { return nil } - arg := &LockTablesArg{ + args := &model.LockTablesArgs{ UnlockTables: unlockTables, SessionInfo: se, } job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: unlockTables[0].SchemaID, TableID: unlockTables[0].TableID, Type: model.ActionUnlockTable, BinlogInfo: &model.HistoryInfo{}, - Args: []any{arg}, } ctx, err := d.sessPool.Get() @@ -1050,7 +1046,7 @@ func (d *ddl) cleanDeadTableLock(unlockTables []model.TableLockTpInfo, se model. return err } defer d.sessPool.Put(ctx) - err = d.executor.DoDDLJob(ctx, job) + err = d.executor.doDDLJob2(ctx, job, args) return errors.Trace(err) } @@ -1100,30 +1096,6 @@ func (d *ddl) SwitchMDL(enable bool) error { return nil } -// RecoverInfo contains information needed by DDL.RecoverTable. -type RecoverInfo struct { - SchemaID int64 - TableInfo *model.TableInfo - DropJobID int64 - SnapshotTS uint64 - AutoIDs meta.AutoIDGroup - OldSchemaName string - OldTableName string -} - -// RecoverSchemaInfo contains information needed by DDL.RecoverSchema. -type RecoverSchemaInfo struct { - *model.DBInfo - RecoverTabsInfo []*RecoverInfo - // LoadTablesOnExecute is the new logic to avoid a large RecoverTabsInfo can't be - // persisted. If it's true, DDL owner will recover RecoverTabsInfo instead of the - // job submit node. - LoadTablesOnExecute bool - DropJobID int64 - SnapshotTS uint64 - OldSchemaName pmodel.CIStr -} - // delayForAsyncCommit sleeps `SafeWindow + AllowedClockDrift` before a DDL job finishes. // It should be called before any DDL that could break data consistency. // This provides a safe window for async commit and 1PC to commit with an old schema. diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index 0dd212e176220..2c3bcc85b73c8 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -104,8 +104,8 @@ type Executor interface { CreateTable(ctx sessionctx.Context, stmt *ast.CreateTableStmt) error CreateView(ctx sessionctx.Context, stmt *ast.CreateViewStmt) error DropTable(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error) - RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (err error) - RecoverSchema(ctx sessionctx.Context, recoverSchemaInfo *RecoverSchemaInfo) error + RecoverTable(ctx sessionctx.Context, recoverTableInfo *model.RecoverTableInfo) (err error) + RecoverSchema(ctx sessionctx.Context, recoverSchemaInfo *model.RecoverSchemaInfo) error DropView(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error) CreateIndex(ctx sessionctx.Context, stmt *ast.CreateIndexStmt) error DropIndex(ctx sessionctx.Context, stmt *ast.DropIndexStmt) error @@ -797,7 +797,7 @@ func (e *executor) DropSchema(ctx sessionctx.Context, stmt *ast.DropDatabaseStmt return nil } -func (e *executor) RecoverSchema(ctx sessionctx.Context, recoverSchemaInfo *RecoverSchemaInfo) error { +func (e *executor) RecoverSchema(ctx sessionctx.Context, recoverSchemaInfo *model.RecoverSchemaInfo) error { involvedSchemas := []model.InvolvingSchemaInfo{{ Database: recoverSchemaInfo.DBInfo.Name.L, Table: model.InvolvingAll, @@ -810,14 +810,19 @@ func (e *executor) RecoverSchema(ctx sessionctx.Context, recoverSchemaInfo *Reco } recoverSchemaInfo.State = model.StateNone job := &model.Job{ + Version: model.GetJobVerInUse(), Type: model.ActionRecoverSchema, BinlogInfo: &model.HistoryInfo{}, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, - Args: []any{recoverSchemaInfo, recoverCheckFlagNone}, InvolvingSchemaInfo: involvedSchemas, SQLMode: ctx.GetSessionVars().SQLMode, } - err := e.DoDDLJob(ctx, job) + + args := &model.RecoverArgs{ + RecoverInfo: recoverSchemaInfo, + CheckFlag: recoverCheckFlagNone, + } + err := e.doDDLJob2(ctx, job, args) return errors.Trace(err) } @@ -1443,9 +1448,9 @@ func (e *executor) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) return errors.Trace(err) } -func (e *executor) RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (err error) { +func (e *executor) RecoverTable(ctx sessionctx.Context, recoverTableInfo *model.RecoverTableInfo) (err error) { is := e.infoCache.GetLatest() - schemaID, tbInfo := recoverInfo.SchemaID, recoverInfo.TableInfo + schemaID, tbInfo := recoverTableInfo.SchemaID, recoverTableInfo.TableInfo // Check schema exist. schema, ok := is.SchemaByID(schemaID) if !ok { @@ -1461,28 +1466,33 @@ func (e *executor) RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo // for "flashback table xxx to yyy" // Note: this case only allow change table name, schema remains the same. var involvedSchemas []model.InvolvingSchemaInfo - if recoverInfo.OldTableName != tbInfo.Name.L { + if recoverTableInfo.OldTableName != tbInfo.Name.L { involvedSchemas = []model.InvolvingSchemaInfo{ - {Database: schema.Name.L, Table: recoverInfo.OldTableName}, + {Database: schema.Name.L, Table: recoverTableInfo.OldTableName}, {Database: schema.Name.L, Table: tbInfo.Name.L}, } } tbInfo.State = model.StateNone job := &model.Job{ - SchemaID: schemaID, - TableID: tbInfo.ID, - SchemaName: schema.Name.L, - TableName: tbInfo.Name.L, - + Version: model.GetJobVerInUse(), + SchemaID: schemaID, + TableID: tbInfo.ID, + SchemaName: schema.Name.L, + TableName: tbInfo.Name.L, Type: model.ActionRecoverTable, BinlogInfo: &model.HistoryInfo{}, - Args: []any{recoverInfo, recoverCheckFlagNone}, InvolvingSchemaInfo: involvedSchemas, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, SQLMode: ctx.GetSessionVars().SQLMode, } - err = e.DoDDLJob(ctx, job) + + args := &model.RecoverArgs{ + RecoverInfo: &model.RecoverSchemaInfo{ + RecoverTableInfos: []*model.RecoverTableInfo{recoverTableInfo}, + }, + CheckFlag: recoverCheckFlagNone} + err = e.doDDLJob2(ctx, job, args) return errors.Trace(err) } @@ -5318,24 +5328,24 @@ func (e *executor) LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) } unlockTables := ctx.GetAllTableLocks() - arg := &LockTablesArg{ + args := &model.LockTablesArgs{ LockTables: lockTables, UnlockTables: unlockTables, SessionInfo: sessionInfo, } job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: lockTables[0].SchemaID, TableID: lockTables[0].TableID, Type: model.ActionLockTable, BinlogInfo: &model.HistoryInfo{}, - Args: []any{arg}, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, InvolvingSchemaInfo: involveSchemaInfo, SQLMode: ctx.GetSessionVars().SQLMode, } // AddTableLock here is avoiding this job was executed successfully but the session was killed before return. ctx.AddTableLock(lockTables) - err := e.DoDDLJob(ctx, job) + err := e.doDDLJob2(ctx, job, args) if err == nil { ctx.ReleaseTableLocks(unlockTables) ctx.AddTableLock(lockTables) @@ -5348,7 +5358,7 @@ func (e *executor) UnlockTables(ctx sessionctx.Context, unlockTables []model.Tab if len(unlockTables) == 0 { return nil } - arg := &LockTablesArg{ + args := &model.LockTablesArgs{ UnlockTables: unlockTables, SessionInfo: model.SessionInfo{ ServerID: e.uuid, @@ -5373,17 +5383,17 @@ func (e *executor) UnlockTables(ctx sessionctx.Context, unlockTables []model.Tab }) } job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: unlockTables[0].SchemaID, TableID: unlockTables[0].TableID, Type: model.ActionUnlockTable, BinlogInfo: &model.HistoryInfo{}, - Args: []any{arg}, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, InvolvingSchemaInfo: involveSchemaInfo, SQLMode: ctx.GetSessionVars().SQLMode, } - err := e.DoDDLJob(ctx, job) + err := e.doDDLJob2(ctx, job, args) if err == nil { ctx.ReleaseAllTableLocks() } @@ -5440,37 +5450,27 @@ func (e *executor) CleanupTableLock(ctx sessionctx.Context, tables []*ast.TableN return nil } - arg := &LockTablesArg{ + args := &model.LockTablesArgs{ UnlockTables: cleanupTables, IsCleanup: true, } job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: cleanupTables[0].SchemaID, TableID: cleanupTables[0].TableID, Type: model.ActionUnlockTable, BinlogInfo: &model.HistoryInfo{}, - Args: []any{arg}, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, InvolvingSchemaInfo: involvingSchemaInfo, SQLMode: ctx.GetSessionVars().SQLMode, } - err := e.DoDDLJob(ctx, job) + err := e.doDDLJob2(ctx, job, args) if err == nil { ctx.ReleaseTableLocks(cleanupTables) } return errors.Trace(err) } -// LockTablesArg is the argument for LockTables, export for test. -type LockTablesArg struct { - LockTables []model.TableLockTpInfo - IndexOfLock int - UnlockTables []model.TableLockTpInfo - IndexOfUnlock int - SessionInfo model.SessionInfo - IsCleanup bool -} - func (e *executor) RepairTable(ctx sessionctx.Context, createStmt *ast.CreateTableStmt) error { // Existence of DB and table has been checked in the preprocessor. oldTableInfo, ok := (ctx.Value(domainutil.RepairedTable)).(*model.TableInfo) @@ -5535,17 +5535,19 @@ func (e *executor) RepairTable(ctx sessionctx.Context, createStmt *ast.CreateTab newTableInfo.State = model.StateNone job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: oldDBInfo.ID, TableID: newTableInfo.ID, SchemaName: oldDBInfo.Name.L, TableName: newTableInfo.Name.L, Type: model.ActionRepairTable, BinlogInfo: &model.HistoryInfo{}, - Args: []any{newTableInfo}, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, SQLMode: ctx.GetSessionVars().SQLMode, } - err = e.DoDDLJob(ctx, job) + + args := &model.RepairTableArgs{TableInfo: newTableInfo} + err = e.doDDLJob2(ctx, job, args) if err == nil { // Remove the old TableInfo from repairInfo before domain reload. domainutil.RepairInfo.RemoveFromRepairInfo(oldDBInfo.Name.L, oldTableInfo.Name.L) diff --git a/pkg/ddl/job_worker.go b/pkg/ddl/job_worker.go index c6fc8b723bde2..7bd921c067cdc 100644 --- a/pkg/ddl/job_worker.go +++ b/pkg/ddl/job_worker.go @@ -393,15 +393,11 @@ func (w *worker) deleteDDLJob(job *model.Job) error { } func finishRecoverTable(w *worker, job *model.Job) error { - var ( - recoverInfo *RecoverInfo - recoverTableCheckFlag int64 - ) - err := job.DecodeArgs(&recoverInfo, &recoverTableCheckFlag) + args, err := model.GetRecoverArgs(job) if err != nil { return errors.Trace(err) } - if recoverTableCheckFlag == recoverCheckFlagEnableGC { + if args.CheckFlag == recoverCheckFlagEnableGC { err = enableGC(w) if err != nil { return errors.Trace(err) @@ -411,15 +407,11 @@ func finishRecoverTable(w *worker, job *model.Job) error { } func finishRecoverSchema(w *worker, job *model.Job) error { - var ( - recoverSchemaInfo *RecoverSchemaInfo - recoverSchemaCheckFlag int64 - ) - err := job.DecodeArgs(&recoverSchemaInfo, &recoverSchemaCheckFlag) + args, err := model.GetRecoverArgs(job) if err != nil { return errors.Trace(err) } - if recoverSchemaCheckFlag == recoverCheckFlagEnableGC { + if args.CheckFlag == recoverCheckFlagEnableGC { err = enableGC(w) if err != nil { return errors.Trace(err) diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index 2396465bf5659..29c631ca43646 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -2833,7 +2833,7 @@ func (w *worker) onExchangeTablePartition(jobCtx *jobContext, t *meta.Meta, job // Set both tables to the maximum auto IDs between normal table and partitioned table. // TODO: Fix the issue of big transactions during EXCHANGE PARTITION with AutoID. // Similar to https://github.com/pingcap/tidb/issues/46904 - newAutoIDs := meta.AutoIDGroup{ + newAutoIDs := model.AutoIDGroup{ RowID: mathutil.Max(ptAutoIDs.RowID, ntAutoIDs.RowID), IncrementID: mathutil.Max(ptAutoIDs.IncrementID, ntAutoIDs.IncrementID), RandomID: mathutil.Max(ptAutoIDs.RandomID, ntAutoIDs.RandomID), diff --git a/pkg/ddl/schema.go b/pkg/ddl/schema.go index d5f5001db55f1..ad34f3d92ea76 100644 --- a/pkg/ddl/schema.go +++ b/pkg/ddl/schema.go @@ -230,15 +230,14 @@ func onDropSchema(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, } func (w *worker) onRecoverSchema(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) { - var ( - recoverSchemaInfo *RecoverSchemaInfo - recoverSchemaCheckFlag int64 - ) - if err := job.DecodeArgs(&recoverSchemaInfo, &recoverSchemaCheckFlag); err != nil { + args, err := model.GetRecoverArgs(job) + if err != nil { // Invalid arguments, cancel this job. job.State = model.JobStateCancelled return ver, errors.Trace(err) } + recoverSchemaInfo := args.RecoverInfo + schemaInfo := recoverSchemaInfo.DBInfo // check GC and safe point gcEnable, err := checkGCEnable(w) @@ -251,10 +250,12 @@ func (w *worker) onRecoverSchema(jobCtx *jobContext, t *meta.Meta, job *model.Jo // none -> write only // check GC enable and update flag. if gcEnable { - job.Args[checkFlagIndexInJobArgs] = recoverCheckFlagEnableGC + args.CheckFlag = recoverCheckFlagEnableGC } else { - job.Args[checkFlagIndexInJobArgs] = recoverCheckFlagDisableGC + args.CheckFlag = recoverCheckFlagDisableGC } + job.FillArgs(args) + schemaInfo.State = model.StateWriteOnly job.SchemaState = model.StateWriteOnly case model.StateWriteOnly: @@ -268,7 +269,7 @@ func (w *worker) onRecoverSchema(jobCtx *jobContext, t *meta.Meta, job *model.Jo } } - recoverTbls := recoverSchemaInfo.RecoverTabsInfo + recoverTbls := recoverSchemaInfo.RecoverTableInfos if recoverSchemaInfo.LoadTablesOnExecute { sid := recoverSchemaInfo.DBInfo.ID snap := w.store.GetSnapshot(kv.NewVersion(recoverSchemaInfo.SnapshotTS)) @@ -278,14 +279,14 @@ func (w *worker) onRecoverSchema(jobCtx *jobContext, t *meta.Meta, job *model.Jo job.State = model.JobStateCancelled return ver, errors.Trace(err2) } - recoverTbls = make([]*RecoverInfo, 0, len(tables)) + recoverTbls = make([]*model.RecoverTableInfo, 0, len(tables)) for _, tblInfo := range tables { autoIDs, err3 := snapMeta.GetAutoIDAccessors(sid, tblInfo.ID).Get() if err3 != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err3) } - recoverTbls = append(recoverTbls, &RecoverInfo{ + recoverTbls = append(recoverTbls, &model.RecoverTableInfo{ SchemaID: sid, TableInfo: tblInfo, DropJobID: recoverSchemaInfo.DropJobID, diff --git a/pkg/ddl/schema_version.go b/pkg/ddl/schema_version.go index b0941b56bc290..6f35d0e60944d 100644 --- a/pkg/ddl/schema_version.go +++ b/pkg/ddl/schema_version.go @@ -266,17 +266,11 @@ func SetSchemaDiffForCreateTable(diff *model.SchemaDiff, job *model.Job) error { // SetSchemaDiffForRecoverSchema set SchemaDiff for ActionRecoverSchema. func SetSchemaDiffForRecoverSchema(diff *model.SchemaDiff, job *model.Job) error { - var ( - recoverSchemaInfo *RecoverSchemaInfo - recoverSchemaCheckFlag int64 - ) - err := job.DecodeArgs(&recoverSchemaInfo, &recoverSchemaCheckFlag) + args, err := model.GetRecoverArgs(job) if err != nil { return errors.Trace(err) } - // Reserved recoverSchemaCheckFlag value for gc work judgment. - job.Args[checkFlagIndexInJobArgs] = recoverSchemaCheckFlag - recoverTabsInfo := recoverSchemaInfo.RecoverTabsInfo + recoverTabsInfo := args.RecoverTableInfos() diff.AffectedOpts = make([]*model.AffectedOption, len(recoverTabsInfo)) for i := range recoverTabsInfo { diff.AffectedOpts[i] = &model.AffectedOption{ diff --git a/pkg/ddl/schematracker/checker.go b/pkg/ddl/schematracker/checker.go index c2112a12c3174..1c666a026d352 100644 --- a/pkg/ddl/schematracker/checker.go +++ b/pkg/ddl/schematracker/checker.go @@ -233,7 +233,7 @@ func (d *Checker) DropSchema(ctx sessionctx.Context, stmt *ast.DropDatabaseStmt) } // RecoverSchema implements the DDL interface. -func (*Checker) RecoverSchema(_ sessionctx.Context, _ *ddl.RecoverSchemaInfo) (err error) { +func (*Checker) RecoverSchema(_ sessionctx.Context, _ *model.RecoverSchemaInfo) (err error) { return nil } @@ -288,7 +288,7 @@ func (d *Checker) DropTable(ctx sessionctx.Context, stmt *ast.DropTableStmt) (er } // RecoverTable implements the DDL interface. -func (*Checker) RecoverTable(_ sessionctx.Context, _ *ddl.RecoverInfo) (err error) { +func (*Checker) RecoverTable(_ sessionctx.Context, _ *model.RecoverTableInfo) (err error) { //TODO implement me panic("implement me") } diff --git a/pkg/ddl/schematracker/dm_tracker.go b/pkg/ddl/schematracker/dm_tracker.go index 5e38d10559a8b..be9ce8891cfb2 100644 --- a/pkg/ddl/schematracker/dm_tracker.go +++ b/pkg/ddl/schematracker/dm_tracker.go @@ -320,7 +320,7 @@ func (d *SchemaTracker) DropTable(_ sessionctx.Context, stmt *ast.DropTableStmt) } // RecoverTable implements the DDL interface, which is no-op in DM's case. -func (*SchemaTracker) RecoverTable(_ sessionctx.Context, _ *ddl.RecoverInfo) (err error) { +func (*SchemaTracker) RecoverTable(_ sessionctx.Context, _ *model.RecoverTableInfo) (err error) { return nil } @@ -330,7 +330,7 @@ func (*SchemaTracker) FlashbackCluster(_ sessionctx.Context, _ uint64) (err erro } // RecoverSchema implements the DDL interface, which is no-op in DM's case. -func (*SchemaTracker) RecoverSchema(_ sessionctx.Context, _ *ddl.RecoverSchemaInfo) (err error) { +func (*SchemaTracker) RecoverSchema(_ sessionctx.Context, _ *model.RecoverSchemaInfo) (err error) { return nil } diff --git a/pkg/ddl/table.go b/pkg/ddl/table.go index 87932dd89ce42..89b2fa76a27b6 100644 --- a/pkg/ddl/table.go +++ b/pkg/ddl/table.go @@ -138,15 +138,13 @@ func onDropTableOrView(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver in } func (w *worker) onRecoverTable(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, err error) { - var ( - recoverInfo *RecoverInfo - recoverTableCheckFlag int64 - ) - if err = job.DecodeArgs(&recoverInfo, &recoverTableCheckFlag); err != nil { + args, err := model.GetRecoverArgs(job) + if err != nil { // Invalid arguments, cancel this job. job.State = model.JobStateCancelled return ver, errors.Trace(err) } + recoverInfo := args.RecoverTableInfos()[0] schemaID := recoverInfo.SchemaID tblInfo := recoverInfo.TableInfo @@ -199,10 +197,11 @@ func (w *worker) onRecoverTable(jobCtx *jobContext, t *meta.Meta, job *model.Job // none -> write only // check GC enable and update flag. if gcEnable { - job.Args[checkFlagIndexInJobArgs] = recoverCheckFlagEnableGC + args.CheckFlag = recoverCheckFlagEnableGC } else { - job.Args[checkFlagIndexInJobArgs] = recoverCheckFlagDisableGC + args.CheckFlag = recoverCheckFlagDisableGC } + job.FillArgs(args) job.SchemaState = model.StateWriteOnly tblInfo.State = model.StateWriteOnly @@ -243,7 +242,7 @@ func (w *worker) onRecoverTable(jobCtx *jobContext, t *meta.Meta, job *model.Job return ver, nil } -func (w *worker) recoverTable(t *meta.Meta, job *model.Job, recoverInfo *RecoverInfo) (ver int64, err error) { +func (w *worker) recoverTable(t *meta.Meta, job *model.Job, recoverInfo *model.RecoverTableInfo) (ver int64, err error) { var tids []int64 if recoverInfo.TableInfo.GetPartitionInfo() != nil { tids = getPartitionIDs(recoverInfo.TableInfo) @@ -288,6 +287,8 @@ func (w *worker) recoverTable(t *meta.Meta, job *model.Job, recoverInfo *Recover job.State = model.JobStateCancelled return ver, errors.Wrapf(err, "failed to update the label rule to PD") } + + // TODO(joechenrh): tid is used in SerSchemaDiffForDropTable, remove this after refactor done. job.CtxVars = []any{tids} return ver, nil } @@ -1338,18 +1339,19 @@ type schemaIDAndTableInfo struct { func onRepairTable(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) { schemaID := job.SchemaID - tblInfo := &model.TableInfo{} - if err := job.DecodeArgs(tblInfo); err != nil { + args, err := model.GetRepairTableArgs(job) + if err != nil { // Invalid arguments, cancel this job. job.State = model.JobStateCancelled return ver, errors.Trace(err) } + tblInfo := args.TableInfo tblInfo.State = model.StateNone // Check the old DB and old table exist. - _, err := GetTableInfoAndCancelFaultJob(t, job, schemaID) + _, err = GetTableInfoAndCancelFaultJob(t, job, schemaID) if err != nil { return ver, errors.Trace(err) } diff --git a/pkg/ddl/table_lock.go b/pkg/ddl/table_lock.go index a2643bf937f67..ef6d6e6a13fae 100644 --- a/pkg/ddl/table_lock.go +++ b/pkg/ddl/table_lock.go @@ -24,28 +24,28 @@ import ( ) func onLockTables(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, err error) { - arg := &LockTablesArg{} - if err := job.DecodeArgs(arg); err != nil { + args, err := model.GetLockTablesArgs(job) + if err != nil { // Invalid arguments, cancel this job. job.State = model.JobStateCancelled return ver, errors.Trace(err) } // Unlock table first. - if arg.IndexOfUnlock < len(arg.UnlockTables) { - return unlockTables(jobCtx, t, job, arg) + if args.IndexOfUnlock < len(args.UnlockTables) { + return unlockTables(jobCtx, t, job, args) } // Check table locked by other, this can be only checked at the first time. - if arg.IndexOfLock == 0 { - for i, tl := range arg.LockTables { + if args.IndexOfLock == 0 { + for i, tl := range args.LockTables { job.SchemaID = tl.SchemaID job.TableID = tl.TableID tbInfo, err := GetTableInfoAndCancelFaultJob(t, job, job.SchemaID) if err != nil { return ver, err } - err = checkTableLocked(tbInfo, arg.LockTables[i].Tp, arg.SessionInfo) + err = checkTableLocked(tbInfo, args.LockTables[i].Tp, args.SessionInfo) if err != nil { // If any request table was locked by other session, just cancel this job. // No need to rolling back the unlocked tables, MySQL will release the lock first @@ -57,15 +57,15 @@ func onLockTables(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, } // Lock tables. - if arg.IndexOfLock < len(arg.LockTables) { - job.SchemaID = arg.LockTables[arg.IndexOfLock].SchemaID - job.TableID = arg.LockTables[arg.IndexOfLock].TableID + if args.IndexOfLock < len(args.LockTables) { + job.SchemaID = args.LockTables[args.IndexOfLock].SchemaID + job.TableID = args.LockTables[args.IndexOfLock].TableID var tbInfo *model.TableInfo tbInfo, err = GetTableInfoAndCancelFaultJob(t, job, job.SchemaID) if err != nil { return ver, err } - err = lockTable(tbInfo, arg.IndexOfLock, arg) + err = lockTable(tbInfo, args.IndexOfLock, args) if err != nil { job.State = model.JobStateCancelled return ver, err @@ -86,9 +86,9 @@ func onLockTables(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, if err != nil { return ver, errors.Trace(err) } - arg.IndexOfLock++ - job.Args = []any{arg} - if arg.IndexOfLock == len(arg.LockTables) { + args.IndexOfLock++ + job.FillArgs(args) + if args.IndexOfLock == len(args.LockTables) { // Finish this job. job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, nil) } @@ -112,12 +112,12 @@ func findSessionInfoIndex(sessions []model.SessionInfo, sessionInfo model.Sessio } // lockTable uses to check table locked and acquire the table lock for the request session. -func lockTable(tbInfo *model.TableInfo, idx int, arg *LockTablesArg) error { +func lockTable(tbInfo *model.TableInfo, idx int, args *model.LockTablesArgs) error { if !tbInfo.IsLocked() { tbInfo.Lock = &model.TableLockInfo{ - Tp: arg.LockTables[idx].Tp, + Tp: args.LockTables[idx].Tp, } - tbInfo.Lock.Sessions = append(tbInfo.Lock.Sessions, arg.SessionInfo) + tbInfo.Lock.Sessions = append(tbInfo.Lock.Sessions, args.SessionInfo) return nil } // If the state of the lock is in pre-lock, then the lock must be locked by the current request. So we can just return here. @@ -125,14 +125,14 @@ func lockTable(tbInfo *model.TableInfo, idx int, arg *LockTablesArg) error { if tbInfo.Lock.State == model.TableLockStatePreLock { return nil } - if (tbInfo.Lock.Tp == pmodel.TableLockRead && arg.LockTables[idx].Tp == pmodel.TableLockRead) || - (tbInfo.Lock.Tp == pmodel.TableLockReadOnly && arg.LockTables[idx].Tp == pmodel.TableLockReadOnly) { - sessionIndex := findSessionInfoIndex(tbInfo.Lock.Sessions, arg.SessionInfo) + if (tbInfo.Lock.Tp == pmodel.TableLockRead && args.LockTables[idx].Tp == pmodel.TableLockRead) || + (tbInfo.Lock.Tp == pmodel.TableLockReadOnly && args.LockTables[idx].Tp == pmodel.TableLockReadOnly) { + sessionIndex := findSessionInfoIndex(tbInfo.Lock.Sessions, args.SessionInfo) // repeat lock. if sessionIndex >= 0 { return nil } - tbInfo.Lock.Sessions = append(tbInfo.Lock.Sessions, arg.SessionInfo) + tbInfo.Lock.Sessions = append(tbInfo.Lock.Sessions, args.SessionInfo) return nil } @@ -168,24 +168,24 @@ func checkTableLocked(tbInfo *model.TableInfo, lockTp pmodel.TableLockType, sess } // unlockTables uses unlock a batch of table lock one by one. -func unlockTables(jobCtx *jobContext, t *meta.Meta, job *model.Job, arg *LockTablesArg) (ver int64, err error) { - if arg.IndexOfUnlock >= len(arg.UnlockTables) { +func unlockTables(jobCtx *jobContext, t *meta.Meta, job *model.Job, args *model.LockTablesArgs) (ver int64, err error) { + if args.IndexOfUnlock >= len(args.UnlockTables) { return ver, nil } - job.SchemaID = arg.UnlockTables[arg.IndexOfUnlock].SchemaID - job.TableID = arg.UnlockTables[arg.IndexOfUnlock].TableID + job.SchemaID = args.UnlockTables[args.IndexOfUnlock].SchemaID + job.TableID = args.UnlockTables[args.IndexOfUnlock].TableID tbInfo, err := getTableInfo(t, job.TableID, job.SchemaID) if err != nil { if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) { // The table maybe has been dropped. just ignore this err and go on. - arg.IndexOfUnlock++ - job.Args = []any{arg} + args.IndexOfUnlock++ + job.FillArgs(args) return ver, nil } return ver, err } - needUpdateTableInfo := unlockTable(tbInfo, arg) + needUpdateTableInfo := unlockTable(tbInfo, args) if needUpdateTableInfo { ver, err = updateVersionAndTableInfo(jobCtx, t, job, tbInfo, true) if err != nil { @@ -193,22 +193,22 @@ func unlockTables(jobCtx *jobContext, t *meta.Meta, job *model.Job, arg *LockTab } } - arg.IndexOfUnlock++ - job.Args = []any{arg} + args.IndexOfUnlock++ + job.FillArgs(args) return ver, nil } // unlockTable uses to unlock table lock that hold by the session. -func unlockTable(tbInfo *model.TableInfo, arg *LockTablesArg) (needUpdateTableInfo bool) { +func unlockTable(tbInfo *model.TableInfo, args *model.LockTablesArgs) (needUpdateTableInfo bool) { if !tbInfo.IsLocked() { return false } - if arg.IsCleanup { + if args.IsCleanup { tbInfo.Lock = nil return true } - sessionIndex := findSessionInfoIndex(tbInfo.Lock.Sessions, arg.SessionInfo) + sessionIndex := findSessionInfoIndex(tbInfo.Lock.Sessions, args.SessionInfo) if sessionIndex < 0 { // When session clean table lock, session maybe send unlock table even the table lock maybe not hold by the session. // so just ignore and return here. @@ -224,15 +224,15 @@ func unlockTable(tbInfo *model.TableInfo, arg *LockTablesArg) (needUpdateTableIn } func onUnlockTables(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, err error) { - arg := &LockTablesArg{} - if err := job.DecodeArgs(arg); err != nil { + args, err := model.GetLockTablesArgs(job) + if err != nil { // Invalid arguments, cancel this job. job.State = model.JobStateCancelled return ver, errors.Trace(err) } - ver, err = unlockTables(jobCtx, t, job, arg) - if arg.IndexOfUnlock == len(arg.UnlockTables) { + ver, err = unlockTables(jobCtx, t, job, args) + if args.IndexOfUnlock == len(args.UnlockTables) { job.FinishTableJob(model.JobStateDone, model.StateNone, ver, nil) } return ver, err diff --git a/pkg/ddl/table_test.go b/pkg/ddl/table_test.go index f4c1dec6f04e1..9a8157e3ef8de 100644 --- a/pkg/ddl/table_test.go +++ b/pkg/ddl/table_test.go @@ -115,7 +115,7 @@ func testLockTable( tblInfo *model.TableInfo, lockTp pmodel.TableLockType, ) *model.Job { - arg := &ddl.LockTablesArg{ + args := &model.LockTablesArgs{ LockTables: []model.TableLockTpInfo{{SchemaID: newSchemaID, TableID: tblInfo.ID, Tp: lockTp}}, SessionInfo: model.SessionInfo{ ServerID: uuid, @@ -123,17 +123,17 @@ func testLockTable( }, } job := &model.Job{ + Version: model.GetJobVerInUse(), SchemaID: newSchemaID, TableID: tblInfo.ID, Type: model.ActionLockTable, BinlogInfo: &model.HistoryInfo{}, - Args: []any{arg}, InvolvingSchemaInfo: []model.InvolvingSchemaInfo{ {Database: schemaName.L, Table: tblInfo.Name.L}, }, } ctx.SetValue(sessionctx.QueryString, "skip") - err := d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)) + err := d.DoDDLJobWrapper(ctx, ddl.NewJobWrapperWithArgs(job, args, true)) require.NoError(t, err) v := getSchemaVer(t, ctx) diff --git a/pkg/executor/ddl.go b/pkg/executor/ddl.go index fff2d2a751382..2aa576bf09001 100644 --- a/pkg/executor/ddl.go +++ b/pkg/executor/ddl.go @@ -422,7 +422,7 @@ func (e *DDLExec) executeRecoverTable(s *ast.RecoverTableStmt) error { return err } - recoverInfo := &ddl.RecoverInfo{ + recoverInfo := &model.RecoverTableInfo{ SchemaID: job.SchemaID, TableInfo: tblInfo, DropJobID: job.ID, @@ -473,9 +473,11 @@ func (e *DDLExec) getRecoverTableByJobID(s *ast.RecoverTableStmt, dom *domain.Do fmt.Sprintf("(Table ID %d)", job.TableID), ) } - // Return the cloned meta here, since meta will be modified later. - // This may corrupt the infocache. - return job, table.Meta().Clone(), nil + // We can't return the meta directly since it will be modified outside, which may corrupt the infocache. + // Since only State field is changed, return a shallow copy is enough. + // see https://github.com/pingcap/tidb/issues/55462 + tblInfo := *table.Meta() + return job, &tblInfo, nil } // GetDropOrTruncateTableInfoFromJobs gets the dropped/truncated table information from DDL jobs, @@ -540,7 +542,12 @@ func (e *DDLExec) getRecoverTableByTableName(tableName *ast.TableName) (*model.J if tableInfo.TempTableType == model.TempTableGlobal { return nil, nil, exeerrors.ErrUnsupportedFlashbackTmpTable } - return jobInfo, tableInfo, nil + + // We can't return the meta directly since it will be modified outside, which may corrupt the infocache. + // Since only State field is changed, return a shallow copy is enough. + // see https://github.com/pingcap/tidb/issues/55462 + tblInfo := *tableInfo + return jobInfo, &tblInfo, nil } func (e *DDLExec) executeFlashBackCluster(s *ast.FlashBackToTimestampStmt) error { @@ -579,7 +586,7 @@ func (e *DDLExec) executeFlashbackTable(s *ast.FlashBackTableStmt) error { return err } - recoverInfo := &ddl.RecoverInfo{ + recoverInfo := &model.RecoverTableInfo{ SchemaID: job.SchemaID, TableInfo: tblInfo, DropJobID: job.ID, @@ -620,7 +627,7 @@ func (e *DDLExec) executeFlashbackDatabase(s *ast.FlashBackDatabaseStmt) error { return err } -func (e *DDLExec) getRecoverDBByName(schemaName pmodel.CIStr) (recoverSchemaInfo *ddl.RecoverSchemaInfo, err error) { +func (e *DDLExec) getRecoverDBByName(schemaName pmodel.CIStr) (recoverSchemaInfo *model.RecoverSchemaInfo, err error) { txn, err := e.Ctx().Txn(true) if err != nil { return nil, err @@ -654,7 +661,7 @@ func (e *DDLExec) getRecoverDBByName(schemaName pmodel.CIStr) (recoverSchemaInfo if schemaInfo.Name.L != schemaName.L { continue } - recoverSchemaInfo = &ddl.RecoverSchemaInfo{ + recoverSchemaInfo = &model.RecoverSchemaInfo{ DBInfo: schemaInfo, LoadTablesOnExecute: true, DropJobID: job.ID, diff --git a/pkg/infoschema/builder.go b/pkg/infoschema/builder.go index 7f6a8fc268253..27630916979db 100644 --- a/pkg/infoschema/builder.go +++ b/pkg/infoschema/builder.go @@ -274,7 +274,7 @@ func updateAutoIDForExchangePartition(store kv.Storage, ptSchemaID, ptID, ntSche } // Set both tables to the maximum auto IDs between normal table and partitioned table. - newAutoIDs := meta.AutoIDGroup{ + newAutoIDs := model.AutoIDGroup{ RowID: max(ptAutoIDs.RowID, ntAutoIDs.RowID), IncrementID: max(ptAutoIDs.IncrementID, ntAutoIDs.IncrementID), RandomID: max(ptAutoIDs.RandomID, ntAutoIDs.RandomID), diff --git a/pkg/meta/meta.go b/pkg/meta/meta.go index 8af50ca8a5547..f45102e122ee0 100644 --- a/pkg/meta/meta.go +++ b/pkg/meta/meta.go @@ -833,7 +833,7 @@ func (m *Meta) GetSchemaCacheSize() (size uint64, isNull bool, err error) { // CreateTableAndSetAutoID creates a table with tableInfo in database, // and rebases the table autoID. -func (m *Meta) CreateTableAndSetAutoID(dbID int64, tableInfo *model.TableInfo, autoIDs AutoIDGroup) error { +func (m *Meta) CreateTableAndSetAutoID(dbID int64, tableInfo *model.TableInfo, autoIDs model.AutoIDGroup) error { err := m.CreateTableOrView(dbID, tableInfo) if err != nil { return errors.Trace(err) diff --git a/pkg/meta/meta_autoid.go b/pkg/meta/meta_autoid.go index e5071f5dbe899..d4ec69fcd5b60 100644 --- a/pkg/meta/meta_autoid.go +++ b/pkg/meta/meta_autoid.go @@ -93,8 +93,8 @@ func (a *autoIDAccessor) CopyTo(databaseID, tableID int64) error { // AutoIDAccessors represents all the auto IDs of a table. type AutoIDAccessors interface { - Get() (AutoIDGroup, error) - Put(autoIDs AutoIDGroup) error + Get() (model.AutoIDGroup, error) + Put(autoIDs model.AutoIDGroup) error Del() error AccessorPicker @@ -117,7 +117,7 @@ type autoIDAccessors struct { const sepAutoIncVer = model.TableInfoVersion5 // Get implements the interface AutoIDAccessors. -func (a *autoIDAccessors) Get() (autoIDs AutoIDGroup, err error) { +func (a *autoIDAccessors) Get() (autoIDs model.AutoIDGroup, err error) { if autoIDs.RowID, err = a.RowID().Get(); err != nil { return autoIDs, err } @@ -131,7 +131,7 @@ func (a *autoIDAccessors) Get() (autoIDs AutoIDGroup, err error) { } // Put implements the interface AutoIDAccessors. -func (a *autoIDAccessors) Put(autoIDs AutoIDGroup) error { +func (a *autoIDAccessors) Put(autoIDs model.AutoIDGroup) error { if err := a.RowID().Put(autoIDs.RowID); err != nil { return err } @@ -197,10 +197,3 @@ func NewAutoIDAccessors(m *Meta, databaseID, tableID int64) AutoIDAccessors { }, } } - -// AutoIDGroup represents a group of auto IDs of a specific table. -type AutoIDGroup struct { - RowID int64 - IncrementID int64 - RandomID int64 -} diff --git a/pkg/meta/meta_test.go b/pkg/meta/meta_test.go index bb59822466250..8b4e6cea5bb18 100644 --- a/pkg/meta/meta_test.go +++ b/pkg/meta/meta_test.go @@ -373,7 +373,7 @@ func TestMeta(t *testing.T) { ID: 3, Name: pmodel.NewCIStr("tbl3"), } - err = m.CreateTableAndSetAutoID(1, tbInfo3, meta.AutoIDGroup{RowID: 123, IncrementID: 0}) + err = m.CreateTableAndSetAutoID(1, tbInfo3, model.AutoIDGroup{RowID: 123, IncrementID: 0}) require.NoError(t, err) id, err := m.GetAutoIDAccessors(1, tbInfo3.ID).RowID().Get() require.NoError(t, err) diff --git a/pkg/meta/model/job_args.go b/pkg/meta/model/job_args.go index 1ace03b010962..7191b5161c60f 100644 --- a/pkg/meta/model/job_args.go +++ b/pkg/meta/model/job_args.go @@ -23,6 +23,37 @@ import ( "github.com/pingcap/tidb/pkg/util/intest" ) +// AutoIDGroup represents a group of auto IDs of a specific table. +type AutoIDGroup struct { + RowID int64 + IncrementID int64 + RandomID int64 +} + +// RecoverTableInfo contains information needed by DDL.RecoverTable. +type RecoverTableInfo struct { + SchemaID int64 + TableInfo *TableInfo + DropJobID int64 + SnapshotTS uint64 + AutoIDs AutoIDGroup + OldSchemaName string + OldTableName string +} + +// RecoverSchemaInfo contains information needed by DDL.RecoverSchema. +type RecoverSchemaInfo struct { + *DBInfo + RecoverTableInfos []*RecoverTableInfo + // LoadTablesOnExecute is the new logic to avoid a large RecoverTabsInfo can't be + // persisted. If it's true, DDL owner will recover RecoverTabsInfo instead of the + // job submit node. + LoadTablesOnExecute bool + DropJobID int64 + SnapshotTS uint64 + OldSchemaName pmodel.CIStr +} + // getOrDecodeArgsV2 get the argsV2 from job, if the argsV2 is nil, decode rawArgsV2 // and fill argsV2. func getOrDecodeArgsV2[T JobArgs](job *Job) (T, error) { @@ -1167,6 +1198,117 @@ func GetAddCheckConstraintArgs(job *Job) (*AddCheckConstraintArgs, error) { return getOrDecodeArgsV2[*AddCheckConstraintArgs](job) } +// LockTablesArgs is the argument for LockTables. +type LockTablesArgs struct { + LockTables []TableLockTpInfo `json:"lock_tables,omitempty"` + IndexOfLock int `json:"index_of_lock,omitempty"` + UnlockTables []TableLockTpInfo `json:"unlock_tables,omitempty"` + IndexOfUnlock int `json:"index_of_unlock,omitempty"` + SessionInfo SessionInfo `json:"session_info,omitempty"` + IsCleanup bool `json:"is_cleanup:omitempty"` +} + +func (a *LockTablesArgs) fillJob(job *Job) { + job.Args = []any{a} +} + +// GetLockTablesArgs get the LockTablesArgs argument. +func GetLockTablesArgs(job *Job) (*LockTablesArgs, error) { + var args *LockTablesArgs + var err error + + if job.Version == JobVersion1 { + err = job.DecodeArgs(&args) + } else { + args, err = getOrDecodeArgsV2[*LockTablesArgs](job) + } + + if err != nil { + return nil, errors.Trace(err) + } + return args, nil +} + +// RepairTableArgs is the argument for repair table +type RepairTableArgs struct { + *TableInfo `json:"table_info"` +} + +func (a *RepairTableArgs) fillJob(job *Job) { + if job.Version == JobVersion1 { + job.Args = []any{a.TableInfo} + return + } + job.Args = []any{a} +} + +// GetRepairTableArgs get the repair table args. +func GetRepairTableArgs(job *Job) (*RepairTableArgs, error) { + if job.Version == JobVersion1 { + var tblInfo *TableInfo + if err := job.DecodeArgs(&tblInfo); err != nil { + return nil, errors.Trace(err) + } + return &RepairTableArgs{tblInfo}, nil + } + + return getOrDecodeArgsV2[*RepairTableArgs](job) +} + +// RecoverArgs is the argument for recover table/schema. +type RecoverArgs struct { + RecoverInfo *RecoverSchemaInfo `json:"recover_info,omitempty"` + CheckFlag int64 `json:"check_flag,omitempty"` +} + +func (a *RecoverArgs) fillJob(job *Job) { + if job.Version == JobVersion1 { + if job.Type == ActionRecoverTable { + job.Args = []any{a.RecoverTableInfos()[0], a.CheckFlag} + } else { + job.Args = []any{a.RecoverInfo, a.CheckFlag} + } + return + } + job.Args = []any{a} +} + +// RecoverTableInfos get all the recover infos. +func (a *RecoverArgs) RecoverTableInfos() []*RecoverTableInfo { + return a.RecoverInfo.RecoverTableInfos +} + +// GetRecoverArgs get the recover table/schema args. +func GetRecoverArgs(job *Job) (*RecoverArgs, error) { + if job.Version == JobVersion1 { + var ( + recoverTableInfo *RecoverTableInfo + recoverSchemaInfo = &RecoverSchemaInfo{} + recoverCheckFlag int64 + ) + + if job.Type == ActionRecoverTable { + err := job.DecodeArgs(&recoverTableInfo, &recoverCheckFlag) + if err != nil { + return nil, errors.Trace(err) + } + recoverSchemaInfo.RecoverTableInfos = []*RecoverTableInfo{recoverTableInfo} + } else { + err := job.DecodeArgs(recoverSchemaInfo, &recoverCheckFlag) + if err != nil { + return nil, errors.Trace(err) + } + } + + return &RecoverArgs{ + RecoverInfo: recoverSchemaInfo, + CheckFlag: recoverCheckFlag, + }, nil + } + + return getOrDecodeArgsV2[*RecoverArgs](job) +} + // PlacementPolicyArgs is the argument for create/alter/drop placement policy type PlacementPolicyArgs struct { Policy *PolicyInfo `json:"policy,omitempty"` diff --git a/pkg/meta/model/job_args_test.go b/pkg/meta/model/job_args_test.go index 515d7a5e49a4b..338f92ad9cfc2 100644 --- a/pkg/meta/model/job_args_test.go +++ b/pkg/meta/model/job_args_test.go @@ -687,6 +687,72 @@ func TestCheckConstraintArgs(t *testing.T) { } } +func TestLockTableArgs(t *testing.T) { + inArgs := &LockTablesArgs{ + LockTables: []TableLockTpInfo{{1, 1, model.TableLockNone}}, + UnlockTables: []TableLockTpInfo{{2, 2, model.TableLockNone}}, + IndexOfLock: 13, + IndexOfUnlock: 24, + } + for _, v := range []JobVersion{JobVersion1, JobVersion2} { + for _, tp := range []ActionType{ActionLockTable, ActionUnlockTable} { + j2 := &Job{} + require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, tp))) + + args, err := GetLockTablesArgs(j2) + require.NoError(t, err) + require.Equal(t, inArgs.LockTables, args.LockTables) + require.Equal(t, inArgs.UnlockTables, args.UnlockTables) + require.Equal(t, inArgs.IndexOfLock, args.IndexOfLock) + require.Equal(t, inArgs.IndexOfUnlock, args.IndexOfUnlock) + } + } +} + +func TestRepairTableArgs(t *testing.T) { + inArgs := &RepairTableArgs{&TableInfo{ID: 1, Name: model.NewCIStr("t")}} + for _, v := range []JobVersion{JobVersion1, JobVersion2} { + j2 := &Job{} + require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionRepairTable))) + + args, err := GetRepairTableArgs(j2) + require.NoError(t, err) + require.Equal(t, inArgs.TableInfo, args.TableInfo) + } +} + +func TestRecoverArgs(t *testing.T) { + recoverInfo := &RecoverTableInfo{ + SchemaID: 1, + DropJobID: 2, + TableInfo: &TableInfo{ + ID: 100, + Name: model.NewCIStr("table"), + }, + OldSchemaName: "old", + OldTableName: "table", + } + + inArgs := &RecoverArgs{ + RecoverInfo: &RecoverSchemaInfo{ + RecoverTableInfos: []*RecoverTableInfo{recoverInfo}, + }, + CheckFlag: 2, + } + + for _, v := range []JobVersion{JobVersion1, JobVersion2} { + for _, tp := range []ActionType{ActionRecoverTable, ActionRecoverSchema} { + j2 := &Job{} + require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, tp))) + + args, err := GetRecoverArgs(j2) + require.NoError(t, err) + require.Equal(t, inArgs.CheckFlag, args.CheckFlag) + require.Equal(t, inArgs.RecoverInfo, args.RecoverInfo) + } + } +} + func TestPlacementPolicyArgs(t *testing.T) { inArgs := &PlacementPolicyArgs{ Policy: &PolicyInfo{ID: 1, Name: model.NewCIStr("policy"), State: StateDeleteOnly},