Skip to content

Commit

Permalink
ddl: args v2 for lock/unlock/repair table and recover table/schema (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
joechenrh authored Sep 25, 2024
1 parent 9129323 commit 75483d8
Show file tree
Hide file tree
Showing 19 changed files with 351 additions and 180 deletions.
8 changes: 4 additions & 4 deletions pkg/ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3069,11 +3069,11 @@ func TestIssue52680(t *testing.T) {

testSteps := []struct {
sql string
expect meta.AutoIDGroup
expect model.AutoIDGroup
}{
{sql: "", expect: meta.AutoIDGroup{RowID: 0, IncrementID: 4000, RandomID: 0}},
{sql: "drop table issue52680", expect: meta.AutoIDGroup{RowID: 0, IncrementID: 0, RandomID: 0}},
{sql: "recover table issue52680", expect: meta.AutoIDGroup{RowID: 0, IncrementID: 4000, RandomID: 0}},
{sql: "", expect: model.AutoIDGroup{RowID: 0, IncrementID: 4000, RandomID: 0}},
{sql: "drop table issue52680", expect: model.AutoIDGroup{RowID: 0, IncrementID: 0, RandomID: 0}},
{sql: "recover table issue52680", expect: model.AutoIDGroup{RowID: 0, IncrementID: 4000, RandomID: 0}},
}
for _, step := range testSteps {
if step.sql != "" {
Expand Down
34 changes: 3 additions & 31 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/owner"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
Expand All @@ -78,9 +77,6 @@ const (

reorgWorkerCnt = 10
generalWorkerCnt = 10

// checkFlagIndexInJobArgs is the recoverCheckFlag index used in RecoverTable/RecoverSchema job arg list.
checkFlagIndexInJobArgs = 1
)

const (
Expand Down Expand Up @@ -1033,24 +1029,24 @@ func (d *ddl) cleanDeadTableLock(unlockTables []model.TableLockTpInfo, se model.
if len(unlockTables) == 0 {
return nil
}
arg := &LockTablesArg{
args := &model.LockTablesArgs{
UnlockTables: unlockTables,
SessionInfo: se,
}
job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: unlockTables[0].SchemaID,
TableID: unlockTables[0].TableID,
Type: model.ActionUnlockTable,
BinlogInfo: &model.HistoryInfo{},
Args: []any{arg},
}

ctx, err := d.sessPool.Get()
if err != nil {
return err
}
defer d.sessPool.Put(ctx)
err = d.executor.DoDDLJob(ctx, job)
err = d.executor.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand Down Expand Up @@ -1100,30 +1096,6 @@ func (d *ddl) SwitchMDL(enable bool) error {
return nil
}

// RecoverInfo contains information needed by DDL.RecoverTable.
type RecoverInfo struct {
SchemaID int64
TableInfo *model.TableInfo
DropJobID int64
SnapshotTS uint64
AutoIDs meta.AutoIDGroup
OldSchemaName string
OldTableName string
}

// RecoverSchemaInfo contains information needed by DDL.RecoverSchema.
type RecoverSchemaInfo struct {
*model.DBInfo
RecoverTabsInfo []*RecoverInfo
// LoadTablesOnExecute is the new logic to avoid a large RecoverTabsInfo can't be
// persisted. If it's true, DDL owner will recover RecoverTabsInfo instead of the
// job submit node.
LoadTablesOnExecute bool
DropJobID int64
SnapshotTS uint64
OldSchemaName pmodel.CIStr
}

// delayForAsyncCommit sleeps `SafeWindow + AllowedClockDrift` before a DDL job finishes.
// It should be called before any DDL that could break data consistency.
// This provides a safe window for async commit and 1PC to commit with an old schema.
Expand Down
76 changes: 39 additions & 37 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ type Executor interface {
CreateTable(ctx sessionctx.Context, stmt *ast.CreateTableStmt) error
CreateView(ctx sessionctx.Context, stmt *ast.CreateViewStmt) error
DropTable(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error)
RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (err error)
RecoverSchema(ctx sessionctx.Context, recoverSchemaInfo *RecoverSchemaInfo) error
RecoverTable(ctx sessionctx.Context, recoverTableInfo *model.RecoverTableInfo) (err error)
RecoverSchema(ctx sessionctx.Context, recoverSchemaInfo *model.RecoverSchemaInfo) error
DropView(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error)
CreateIndex(ctx sessionctx.Context, stmt *ast.CreateIndexStmt) error
DropIndex(ctx sessionctx.Context, stmt *ast.DropIndexStmt) error
Expand Down Expand Up @@ -797,7 +797,7 @@ func (e *executor) DropSchema(ctx sessionctx.Context, stmt *ast.DropDatabaseStmt
return nil
}

func (e *executor) RecoverSchema(ctx sessionctx.Context, recoverSchemaInfo *RecoverSchemaInfo) error {
func (e *executor) RecoverSchema(ctx sessionctx.Context, recoverSchemaInfo *model.RecoverSchemaInfo) error {
involvedSchemas := []model.InvolvingSchemaInfo{{
Database: recoverSchemaInfo.DBInfo.Name.L,
Table: model.InvolvingAll,
Expand All @@ -810,14 +810,19 @@ func (e *executor) RecoverSchema(ctx sessionctx.Context, recoverSchemaInfo *Reco
}
recoverSchemaInfo.State = model.StateNone
job := &model.Job{
Version: model.GetJobVerInUse(),
Type: model.ActionRecoverSchema,
BinlogInfo: &model.HistoryInfo{},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
Args: []any{recoverSchemaInfo, recoverCheckFlagNone},
InvolvingSchemaInfo: involvedSchemas,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err := e.DoDDLJob(ctx, job)

args := &model.RecoverArgs{
RecoverInfo: recoverSchemaInfo,
CheckFlag: recoverCheckFlagNone,
}
err := e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand Down Expand Up @@ -1443,9 +1448,9 @@ func (e *executor) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64)
return errors.Trace(err)
}

func (e *executor) RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (err error) {
func (e *executor) RecoverTable(ctx sessionctx.Context, recoverTableInfo *model.RecoverTableInfo) (err error) {
is := e.infoCache.GetLatest()
schemaID, tbInfo := recoverInfo.SchemaID, recoverInfo.TableInfo
schemaID, tbInfo := recoverTableInfo.SchemaID, recoverTableInfo.TableInfo
// Check schema exist.
schema, ok := is.SchemaByID(schemaID)
if !ok {
Expand All @@ -1461,28 +1466,33 @@ func (e *executor) RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo
// for "flashback table xxx to yyy"
// Note: this case only allow change table name, schema remains the same.
var involvedSchemas []model.InvolvingSchemaInfo
if recoverInfo.OldTableName != tbInfo.Name.L {
if recoverTableInfo.OldTableName != tbInfo.Name.L {
involvedSchemas = []model.InvolvingSchemaInfo{
{Database: schema.Name.L, Table: recoverInfo.OldTableName},
{Database: schema.Name.L, Table: recoverTableInfo.OldTableName},
{Database: schema.Name.L, Table: tbInfo.Name.L},
}
}

tbInfo.State = model.StateNone
job := &model.Job{
SchemaID: schemaID,
TableID: tbInfo.ID,
SchemaName: schema.Name.L,
TableName: tbInfo.Name.L,

Version: model.GetJobVerInUse(),
SchemaID: schemaID,
TableID: tbInfo.ID,
SchemaName: schema.Name.L,
TableName: tbInfo.Name.L,
Type: model.ActionRecoverTable,
BinlogInfo: &model.HistoryInfo{},
Args: []any{recoverInfo, recoverCheckFlagNone},
InvolvingSchemaInfo: involvedSchemas,
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = e.DoDDLJob(ctx, job)

args := &model.RecoverArgs{
RecoverInfo: &model.RecoverSchemaInfo{
RecoverTableInfos: []*model.RecoverTableInfo{recoverTableInfo},
},
CheckFlag: recoverCheckFlagNone}
err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand Down Expand Up @@ -5318,24 +5328,24 @@ func (e *executor) LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt)
}

unlockTables := ctx.GetAllTableLocks()
arg := &LockTablesArg{
args := &model.LockTablesArgs{
LockTables: lockTables,
UnlockTables: unlockTables,
SessionInfo: sessionInfo,
}
job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: lockTables[0].SchemaID,
TableID: lockTables[0].TableID,
Type: model.ActionLockTable,
BinlogInfo: &model.HistoryInfo{},
Args: []any{arg},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
InvolvingSchemaInfo: involveSchemaInfo,
SQLMode: ctx.GetSessionVars().SQLMode,
}
// AddTableLock here is avoiding this job was executed successfully but the session was killed before return.
ctx.AddTableLock(lockTables)
err := e.DoDDLJob(ctx, job)
err := e.doDDLJob2(ctx, job, args)
if err == nil {
ctx.ReleaseTableLocks(unlockTables)
ctx.AddTableLock(lockTables)
Expand All @@ -5348,7 +5358,7 @@ func (e *executor) UnlockTables(ctx sessionctx.Context, unlockTables []model.Tab
if len(unlockTables) == 0 {
return nil
}
arg := &LockTablesArg{
args := &model.LockTablesArgs{
UnlockTables: unlockTables,
SessionInfo: model.SessionInfo{
ServerID: e.uuid,
Expand All @@ -5373,17 +5383,17 @@ func (e *executor) UnlockTables(ctx sessionctx.Context, unlockTables []model.Tab
})
}
job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: unlockTables[0].SchemaID,
TableID: unlockTables[0].TableID,
Type: model.ActionUnlockTable,
BinlogInfo: &model.HistoryInfo{},
Args: []any{arg},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
InvolvingSchemaInfo: involveSchemaInfo,
SQLMode: ctx.GetSessionVars().SQLMode,
}

err := e.DoDDLJob(ctx, job)
err := e.doDDLJob2(ctx, job, args)
if err == nil {
ctx.ReleaseAllTableLocks()
}
Expand Down Expand Up @@ -5440,37 +5450,27 @@ func (e *executor) CleanupTableLock(ctx sessionctx.Context, tables []*ast.TableN
return nil
}

arg := &LockTablesArg{
args := &model.LockTablesArgs{
UnlockTables: cleanupTables,
IsCleanup: true,
}
job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: cleanupTables[0].SchemaID,
TableID: cleanupTables[0].TableID,
Type: model.ActionUnlockTable,
BinlogInfo: &model.HistoryInfo{},
Args: []any{arg},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
InvolvingSchemaInfo: involvingSchemaInfo,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err := e.DoDDLJob(ctx, job)
err := e.doDDLJob2(ctx, job, args)
if err == nil {
ctx.ReleaseTableLocks(cleanupTables)
}
return errors.Trace(err)
}

// LockTablesArg is the argument for LockTables, export for test.
type LockTablesArg struct {
LockTables []model.TableLockTpInfo
IndexOfLock int
UnlockTables []model.TableLockTpInfo
IndexOfUnlock int
SessionInfo model.SessionInfo
IsCleanup bool
}

func (e *executor) RepairTable(ctx sessionctx.Context, createStmt *ast.CreateTableStmt) error {
// Existence of DB and table has been checked in the preprocessor.
oldTableInfo, ok := (ctx.Value(domainutil.RepairedTable)).(*model.TableInfo)
Expand Down Expand Up @@ -5535,17 +5535,19 @@ func (e *executor) RepairTable(ctx sessionctx.Context, createStmt *ast.CreateTab
newTableInfo.State = model.StateNone

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: oldDBInfo.ID,
TableID: newTableInfo.ID,
SchemaName: oldDBInfo.Name.L,
TableName: newTableInfo.Name.L,
Type: model.ActionRepairTable,
BinlogInfo: &model.HistoryInfo{},
Args: []any{newTableInfo},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = e.DoDDLJob(ctx, job)

args := &model.RepairTableArgs{TableInfo: newTableInfo}
err = e.doDDLJob2(ctx, job, args)
if err == nil {
// Remove the old TableInfo from repairInfo before domain reload.
domainutil.RepairInfo.RemoveFromRepairInfo(oldDBInfo.Name.L, oldTableInfo.Name.L)
Expand Down
16 changes: 4 additions & 12 deletions pkg/ddl/job_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,15 +393,11 @@ func (w *worker) deleteDDLJob(job *model.Job) error {
}

func finishRecoverTable(w *worker, job *model.Job) error {
var (
recoverInfo *RecoverInfo
recoverTableCheckFlag int64
)
err := job.DecodeArgs(&recoverInfo, &recoverTableCheckFlag)
args, err := model.GetRecoverArgs(job)
if err != nil {
return errors.Trace(err)
}
if recoverTableCheckFlag == recoverCheckFlagEnableGC {
if args.CheckFlag == recoverCheckFlagEnableGC {
err = enableGC(w)
if err != nil {
return errors.Trace(err)
Expand All @@ -411,15 +407,11 @@ func finishRecoverTable(w *worker, job *model.Job) error {
}

func finishRecoverSchema(w *worker, job *model.Job) error {
var (
recoverSchemaInfo *RecoverSchemaInfo
recoverSchemaCheckFlag int64
)
err := job.DecodeArgs(&recoverSchemaInfo, &recoverSchemaCheckFlag)
args, err := model.GetRecoverArgs(job)
if err != nil {
return errors.Trace(err)
}
if recoverSchemaCheckFlag == recoverCheckFlagEnableGC {
if args.CheckFlag == recoverCheckFlagEnableGC {
err = enableGC(w)
if err != nil {
return errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2833,7 +2833,7 @@ func (w *worker) onExchangeTablePartition(jobCtx *jobContext, t *meta.Meta, job
// Set both tables to the maximum auto IDs between normal table and partitioned table.
// TODO: Fix the issue of big transactions during EXCHANGE PARTITION with AutoID.
// Similar to https://github.com/pingcap/tidb/issues/46904
newAutoIDs := meta.AutoIDGroup{
newAutoIDs := model.AutoIDGroup{
RowID: mathutil.Max(ptAutoIDs.RowID, ntAutoIDs.RowID),
IncrementID: mathutil.Max(ptAutoIDs.IncrementID, ntAutoIDs.IncrementID),
RandomID: mathutil.Max(ptAutoIDs.RandomID, ntAutoIDs.RandomID),
Expand Down
Loading

0 comments on commit 75483d8

Please sign in to comment.