Skip to content

Commit

Permalink
ddl: args v2 for rename tables (#55946)
Browse files Browse the repository at this point in the history
ref #53930
  • Loading branch information
joechenrh authored Sep 19, 2024
1 parent c13eb90 commit 249e88c
Show file tree
Hide file tree
Showing 8 changed files with 245 additions and 111 deletions.
30 changes: 14 additions & 16 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4294,19 +4294,14 @@ func (e *executor) renameTable(ctx sessionctx.Context, oldIdent, newIdent ast.Id

func (e *executor) renameTables(ctx sessionctx.Context, oldIdents, newIdents []ast.Ident, isAlterTable bool) error {
is := e.infoCache.GetLatest()
oldTableNames := make([]*pmodel.CIStr, 0, len(oldIdents))
tableNames := make([]*pmodel.CIStr, 0, len(oldIdents))
oldSchemaIDs := make([]int64, 0, len(oldIdents))
newSchemaIDs := make([]int64, 0, len(oldIdents))
tableIDs := make([]int64, 0, len(oldIdents))
oldSchemaNames := make([]*pmodel.CIStr, 0, len(oldIdents))
involveSchemaInfo := make([]model.InvolvingSchemaInfo, 0, len(oldIdents)*2)

var schemas []*model.DBInfo
var tableID int64
var err error

tables := make(map[string]int64)
infos := make([]*model.RenameTableArgs, 0, len(oldIdents))
for i := 0; i < len(oldIdents); i++ {
schemas, tableID, err = ExtractTblInfos(is, oldIdents[i], newIdents[i], isAlterTable, tables)
if err != nil {
Expand All @@ -4319,12 +4314,15 @@ func (e *executor) renameTables(ctx sessionctx.Context, oldIdents, newIdents []a
}
}

tableIDs = append(tableIDs, tableID)
oldTableNames = append(oldTableNames, &oldIdents[i].Name)
tableNames = append(tableNames, &newIdents[i].Name)
oldSchemaIDs = append(oldSchemaIDs, schemas[0].ID)
newSchemaIDs = append(newSchemaIDs, schemas[1].ID)
oldSchemaNames = append(oldSchemaNames, &schemas[0].Name)
infos = append(infos, &model.RenameTableArgs{
OldSchemaID: schemas[0].ID,
OldSchemaName: schemas[0].Name,
OldTableName: oldIdents[i].Name,
NewSchemaID: schemas[1].ID,
NewTableName: newIdents[i].Name,
TableID: tableID,
})

involveSchemaInfo = append(involveSchemaInfo,
model.InvolvingSchemaInfo{
Database: schemas[0].Name.L, Table: oldIdents[i].Name.L,
Expand All @@ -4336,19 +4334,19 @@ func (e *executor) renameTables(ctx sessionctx.Context, oldIdents, newIdents []a
}

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: schemas[1].ID,
TableID: tableIDs[0],
TableID: infos[0].TableID,
SchemaName: schemas[1].Name.L,
Type: model.ActionRenameTables,
BinlogInfo: &model.HistoryInfo{},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
Args: []any{oldSchemaIDs, newSchemaIDs, tableNames, tableIDs, oldSchemaNames, oldTableNames},
CtxVars: []any{append(oldSchemaIDs, newSchemaIDs...), tableIDs},
InvolvingSchemaInfo: involveSchemaInfo,
SQLMode: ctx.GetSessionVars().SQLMode,
}

err = e.DoDDLJob(ctx, job)
args := &model.RenameTablesArgs{RenameTableInfos: infos}
err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand Down
79 changes: 45 additions & 34 deletions pkg/ddl/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,50 +697,61 @@ func insertDDLJobs2Table(ctx context.Context, se *sess.Session, jobWs ...*JobWra
return errors.Trace(err)
}

func job2SchemaIDs(jobW *JobWrapper) string {
return job2UniqueIDs(jobW, true)
}
func makeStringForIDs(ids []int64) string {
set := make(map[int64]struct{}, len(ids))
for _, id := range ids {
set[id] = struct{}{}
}

func job2TableIDs(jobW *JobWrapper) string {
return job2UniqueIDs(jobW, false)
s := make([]string, 0, len(set))
for id := range set {
s = append(s, strconv.FormatInt(id, 10))
}
slices.Sort(s)
return strings.Join(s, ",")
}

func job2UniqueIDs(jobW *JobWrapper, schema bool) string {
func job2SchemaIDs(jobW *JobWrapper) string {
switch jobW.Type {
case model.ActionExchangeTablePartition, model.ActionRenameTables, model.ActionRenameTable:
case model.ActionRenameTables:
var ids []int64
if jobW.Type == model.ActionRenameTable {
ids = getRenameTableUniqueIDs(jobW, schema)
} else {
if schema {
ids = jobW.CtxVars[0].([]int64)
} else {
ids = jobW.CtxVars[1].([]int64)
}
}

set := make(map[int64]struct{}, len(ids))
for _, id := range ids {
set[id] = struct{}{}
}
arg := jobW.JobArgs.(*model.RenameTablesArgs)
ids = make([]int64, 0, len(arg.RenameTableInfos)*2)
for _, info := range arg.RenameTableInfos {
ids = append(ids, info.OldSchemaID, info.NewSchemaID)
}
return makeStringForIDs(ids)
case model.ActionRenameTable:
oldSchemaID := jobW.JobArgs.(*model.RenameTableArgs).OldSchemaID
ids := []int64{oldSchemaID, jobW.SchemaID}
return makeStringForIDs(ids)
case model.ActionExchangeTablePartition:
ids := jobW.CtxVars[0].([]int64)
return makeStringForIDs(ids)
default:
return strconv.FormatInt(jobW.SchemaID, 10)
}
}

s := make([]string, 0, len(set))
for id := range set {
s = append(s, strconv.FormatInt(id, 10))
}
slices.Sort(s)
return strings.Join(s, ",")
func job2TableIDs(jobW *JobWrapper) string {
switch jobW.Type {
case model.ActionRenameTables:
var ids []int64
arg := jobW.JobArgs.(*model.RenameTablesArgs)
ids = make([]int64, 0, len(arg.RenameTableInfos))
for _, info := range arg.RenameTableInfos {
ids = append(ids, info.TableID)
}
return makeStringForIDs(ids)
case model.ActionExchangeTablePartition:
ids := jobW.CtxVars[1].([]int64)
return makeStringForIDs(ids)
case model.ActionTruncateTable:
if schema {
return strconv.FormatInt(jobW.SchemaID, 10)
}
newTableID := jobW.JobArgs.(*model.TruncateTableArgs).NewTableID
return strconv.FormatInt(jobW.TableID, 10) + "," + strconv.FormatInt(newTableID, 10)
default:
return strconv.FormatInt(jobW.TableID, 10)
}
if schema {
return strconv.FormatInt(jobW.SchemaID, 10)
}
return strconv.FormatInt(jobW.TableID, 10)
}

func updateDDLJob2Table(se *sess.Session, job *model.Job, updateRawArgs bool) error {
Expand Down
25 changes: 10 additions & 15 deletions pkg/ddl/schema_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/metrics"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/util/mathutil"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -103,30 +102,26 @@ func SetSchemaDiffForRenameTable(diff *model.SchemaDiff, job *model.Job) error {

// SetSchemaDiffForRenameTables set SchemaDiff for ActionRenameTables.
func SetSchemaDiffForRenameTables(diff *model.SchemaDiff, job *model.Job) error {
var (
oldSchemaIDs, newSchemaIDs, tableIDs []int64
tableNames, oldSchemaNames []*pmodel.CIStr
)
err := job.DecodeArgs(&oldSchemaIDs, &newSchemaIDs, &tableNames, &tableIDs, &oldSchemaNames)
args, err := model.GetRenameTablesArgs(job)
if err != nil {
return errors.Trace(err)
}
affects := make([]*model.AffectedOption, len(newSchemaIDs)-1)
for i, newSchemaID := range newSchemaIDs {
affects := make([]*model.AffectedOption, len(args.RenameTableInfos)-1)
for i, info := range args.RenameTableInfos {
// Do not add the first table to AffectedOpts. Related issue tidb#47064.
if i == 0 {
continue
}
affects[i-1] = &model.AffectedOption{
SchemaID: newSchemaID,
TableID: tableIDs[i],
OldTableID: tableIDs[i],
OldSchemaID: oldSchemaIDs[i],
SchemaID: info.NewSchemaID,
TableID: info.TableID,
OldTableID: info.TableID,
OldSchemaID: info.OldSchemaID,
}
}
diff.TableID = tableIDs[0]
diff.SchemaID = newSchemaIDs[0]
diff.OldSchemaID = oldSchemaIDs[0]
diff.TableID = args.RenameTableInfos[0].TableID
diff.SchemaID = args.RenameTableInfos[0].NewSchemaID
diff.OldSchemaID = args.RenameTableInfos[0].OldSchemaID
diff.AffectedOpts = affects
return nil
}
Expand Down
73 changes: 38 additions & 35 deletions pkg/ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ func onRenameTable(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64,
return ver, errors.Trace(err)
}
oldTableName := tblInfo.Name
ver, err = checkAndRenameTables(t, job, tblInfo, oldSchemaID, job.SchemaID, &oldSchemaName, &tableName)
ver, err = checkAndRenameTables(t, job, tblInfo, args)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -779,35 +779,31 @@ func onRenameTable(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64,
}

func onRenameTables(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) {
oldSchemaIDs := []int64{}
newSchemaIDs := []int64{}
tableNames := []*pmodel.CIStr{}
tableIDs := []int64{}
oldSchemaNames := []*pmodel.CIStr{}
oldTableNames := []*pmodel.CIStr{}
if err := job.DecodeArgs(&oldSchemaIDs, &newSchemaIDs, &tableNames, &tableIDs, &oldSchemaNames, &oldTableNames); err != nil {
args, err := model.GetRenameTablesArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

if job.SchemaState == model.StatePublic {
return finishJobRenameTables(jobCtx, t, job, tableNames, tableIDs, newSchemaIDs)
return finishJobRenameTables(jobCtx, t, job, args)
}

var err error
fkh := newForeignKeyHelper()
for i, oldSchemaID := range oldSchemaIDs {
job.TableID = tableIDs[i]
job.TableName = oldTableNames[i].L
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, oldSchemaID)
for _, info := range args.RenameTableInfos {
job.TableID = info.TableID
job.TableName = info.OldTableName.L
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, info.OldSchemaID)
if err != nil {
return ver, errors.Trace(err)
}
ver, err := checkAndRenameTables(t, job, tblInfo, oldSchemaID, newSchemaIDs[i], oldSchemaNames[i], tableNames[i])
ver, err := checkAndRenameTables(t, job, tblInfo, info)
if err != nil {
return ver, errors.Trace(err)
}
err = adjustForeignKeyChildTableInfoAfterRenameTable(jobCtx.infoCache, t, job, &fkh, tblInfo, *oldSchemaNames[i], *oldTableNames[i], *tableNames[i], newSchemaIDs[i])
err = adjustForeignKeyChildTableInfoAfterRenameTable(
jobCtx.infoCache, t, job, &fkh, tblInfo,
info.OldSchemaName, info.OldTableName, info.NewTableName, info.NewSchemaID)
if err != nil {
return ver, errors.Trace(err)
}
Expand All @@ -821,43 +817,43 @@ func onRenameTables(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64
return ver, nil
}

func checkAndRenameTables(t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, oldSchemaID, newSchemaID int64, oldSchemaName, tableName *pmodel.CIStr) (ver int64, _ error) {
err := t.DropTableOrView(oldSchemaID, tblInfo.ID)
func checkAndRenameTables(t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, args *model.RenameTableArgs) (ver int64, _ error) {
err := t.DropTableOrView(args.OldSchemaID, tblInfo.ID)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

failpoint.Inject("renameTableErr", func(val failpoint.Value) {
if valStr, ok := val.(string); ok {
if tableName.L == valStr {
if args.NewTableName.L == valStr {
job.State = model.JobStateCancelled
failpoint.Return(ver, errors.New("occur an error after renaming table"))
}
}
})

oldTableName := tblInfo.Name
tableRuleID, partRuleIDs, oldRuleIDs, oldRules, err := getOldLabelRules(tblInfo, oldSchemaName.L, oldTableName.L)
tableRuleID, partRuleIDs, oldRuleIDs, oldRules, err := getOldLabelRules(tblInfo, args.OldSchemaName.L, oldTableName.L)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to get old label rules from PD")
}

if tblInfo.AutoIDSchemaID == 0 && newSchemaID != oldSchemaID {
if tblInfo.AutoIDSchemaID == 0 && args.NewSchemaID != args.OldSchemaID {
// The auto id is referenced by a schema id + table id
// Table ID is not changed between renames, but schema id can change.
// To allow concurrent use of the auto id during rename, keep the auto id
// by always reference it with the schema id it was originally created in.
tblInfo.AutoIDSchemaID = oldSchemaID
tblInfo.AutoIDSchemaID = args.OldSchemaID
}
if newSchemaID == tblInfo.AutoIDSchemaID {
if args.NewSchemaID == tblInfo.AutoIDSchemaID {
// Back to the original schema id, no longer needed.
tblInfo.AutoIDSchemaID = 0
}

tblInfo.Name = *tableName
err = t.CreateTableOrView(newSchemaID, tblInfo)
tblInfo.Name = args.NewTableName
err = t.CreateTableOrView(args.NewSchemaID, tblInfo)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand All @@ -872,7 +868,10 @@ func checkAndRenameTables(t *meta.Meta, job *model.Job, tblInfo *model.TableInfo
return ver, nil
}

func adjustForeignKeyChildTableInfoAfterRenameTable(infoCache *infoschema.InfoCache, t *meta.Meta, job *model.Job, fkh *foreignKeyHelper, tblInfo *model.TableInfo, oldSchemaName, oldTableName, newTableName pmodel.CIStr, newSchemaID int64) error {
func adjustForeignKeyChildTableInfoAfterRenameTable(
infoCache *infoschema.InfoCache, t *meta.Meta, job *model.Job,
fkh *foreignKeyHelper, tblInfo *model.TableInfo,
oldSchemaName, oldTableName, newTableName pmodel.CIStr, newSchemaID int64) error {
if !variable.EnableForeignKey.Load() || newTableName.L == oldTableName.L {
return nil
}
Expand Down Expand Up @@ -944,15 +943,15 @@ func finishJobRenameTable(jobCtx *jobContext, t *meta.Meta, job *model.Job) (int
return ver, nil
}

func finishJobRenameTables(jobCtx *jobContext, t *meta.Meta, job *model.Job,
tableNames []*pmodel.CIStr, tableIDs, newSchemaIDs []int64) (int64, error) {
tblSchemaIDs := make(map[int64]int64, len(tableIDs))
for i := range tableIDs {
tblSchemaIDs[tableIDs[i]] = newSchemaIDs[i]
func finishJobRenameTables(jobCtx *jobContext, t *meta.Meta, job *model.Job, args *model.RenameTablesArgs) (int64, error) {
infos := args.RenameTableInfos
tblSchemaIDs := make(map[int64]int64, len(infos))
for _, info := range infos {
tblSchemaIDs[info.TableID] = info.NewSchemaID
}
tblInfos := make([]*model.TableInfo, 0, len(tableNames))
for i := range tableIDs {
tblID := tableIDs[i]
tblInfos := make([]*model.TableInfo, 0, len(infos))
for _, info := range infos {
tblID := info.TableID
tblInfo, err := getTableInfo(t, tblID, tblSchemaIDs[tblID])
if err != nil {
job.State = model.JobStateCancelled
Expand All @@ -963,9 +962,13 @@ func finishJobRenameTables(jobCtx *jobContext, t *meta.Meta, job *model.Job,
// Before updating the schema version, we need to reset the old schema ID to new schema ID, so that
// the table info can be dropped normally in `ApplyDiff`. This is because renaming table requires two
// schema versions to complete.
// TODO(joechenrh): set the old schemaID in Args is a bit hacky. Maybe we need a better solution.
var err error
oldRawArgs := job.RawArgs
job.Args[0] = newSchemaIDs
for _, info := range infos {
info.OldSchemaID = info.NewSchemaID
}
job.FillArgs(args)
job.RawArgs, err = json.Marshal(job.Args)
if err != nil {
return 0, errors.Trace(err)
Expand Down
Loading

0 comments on commit 249e88c

Please sign in to comment.