Skip to content

Commit

Permalink
*: fix issue of multi-schema change with foreign key (#40042)
Browse files Browse the repository at this point in the history
close #40037
  • Loading branch information
crazycs520 authored Dec 21, 2022
1 parent 13e2120 commit ad0c202
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 10 deletions.
12 changes: 10 additions & 2 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1980,7 +1980,7 @@ func addIndexForForeignKey(ctx sessionctx.Context, tbInfo *model.TableInfo) erro
if handleCol != nil && len(fk.Cols) == 1 && handleCol.Name.L == fk.Cols[0].L {
continue
}
if model.FindIndexByColumns(tbInfo, fk.Cols...) != nil {
if model.FindIndexByColumns(tbInfo, tbInfo.Indices, fk.Cols...) != nil {
continue
}
idxName := fk.Name
Expand Down Expand Up @@ -3264,6 +3264,14 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast
return dbterror.ErrOptOnCacheTable.GenWithStackByArgs("Alter Table")
}
}
// set name for anonymous foreign key.
maxForeignKeyID := tb.Meta().MaxForeignKeyID
for _, spec := range validSpecs {
if spec.Tp == ast.AlterTableAddConstraint && spec.Constraint.Tp == ast.ConstraintForeignKey && spec.Constraint.Name == "" {
maxForeignKeyID++
spec.Constraint.Name = fmt.Sprintf("fk_%d", maxForeignKeyID)
}
}

if len(validSpecs) > 1 {
sctx.GetSessionVars().StmtCtx.MultiSchemaInfo = model.NewMultiSchemaInfo()
Expand Down Expand Up @@ -6570,7 +6578,7 @@ func (d *ddl) CreateForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName mode
if err != nil {
return err
}
if model.FindIndexByColumns(t.Meta(), fkInfo.Cols...) == nil {
if model.FindIndexByColumns(t.Meta(), t.Meta().Indices, fkInfo.Cols...) == nil {
// Need to auto create index for fk cols
if ctx.GetSessionVars().StmtCtx.MultiSchemaInfo == nil {
ctx.GetSessionVars().StmtCtx.MultiSchemaInfo = model.NewMultiSchemaInfo()
Expand Down
29 changes: 29 additions & 0 deletions ddl/fktest/foreign_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1562,3 +1562,32 @@ func getLatestSchemaDiff(t *testing.T, tk *testkit.TestKit) *model.SchemaDiff {
require.NoError(t, err)
return diff
}

func TestTestMultiSchemaAddForeignKey(t *testing.T) {
store, _ := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@foreign_key_checks=1;")
tk.MustExec("use test")
tk.MustExec("create table t1 (id int key);")
tk.MustExec("create table t2 (a int, b int);")
tk.MustExec("alter table t2 add foreign key (a) references t1(id), add foreign key (b) references t1(id)")
tk.MustExec("alter table t2 add column c int, add column d int")
tk.MustExec("alter table t2 add foreign key (c) references t1(id), add foreign key (d) references t1(id), add index(c), add index(d)")
tk.MustExec("drop table t2")
tk.MustExec("create table t2 (a int, b int, index idx1(a), index idx2(b));")
tk.MustGetErrMsg("alter table t2 drop index idx1, drop index idx2, add foreign key (a) references t1(id), add foreign key (b) references t1(id)",
"[ddl:1553]Cannot drop index 'idx1': needed in a foreign key constraint")
tk.MustExec("alter table t2 drop index idx1, drop index idx2")
tk.MustExec("alter table t2 add foreign key (a) references t1(id), add foreign key (b) references t1(id)")
tk.MustQuery("show create table t2").Check(testkit.Rows("t2 CREATE TABLE `t2` (\n" +
" `a` int(11) DEFAULT NULL,\n" +
" `b` int(11) DEFAULT NULL,\n" +
" KEY `fk_1` (`a`),\n" +
" KEY `fk_2` (`b`),\n" +
" CONSTRAINT `fk_1` FOREIGN KEY (`a`) REFERENCES `test`.`t1` (`id`),\n" +
" CONSTRAINT `fk_2` FOREIGN KEY (`b`) REFERENCES `test`.`t1` (`id`)\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
tk.MustExec("drop table t2")
tk.MustExec("create table t2 (a int, b int, index idx0(a,b), index idx1(a), index idx2(b));")
tk.MustExec("alter table t2 drop index idx1, add foreign key (a) references t1(id), add foreign key (b) references t1(id)")
}
4 changes: 2 additions & 2 deletions ddl/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func checkTableForeignKey(referTblInfo, tblInfo *model.TableInfo, fkInfo *model.
}
}
// check refer columns should have index.
if model.FindIndexByColumns(referTblInfo, fkInfo.RefCols...) == nil {
if model.FindIndexByColumns(referTblInfo, referTblInfo.Indices, fkInfo.RefCols...) == nil {
return infoschema.ErrForeignKeyNoIndexInParent.GenWithStackByArgs(fkInfo.Name, fkInfo.RefTable)
}
return nil
Expand Down Expand Up @@ -660,7 +660,7 @@ func checkAddForeignKeyValidInOwner(d *ddlCtx, t *meta.Meta, schema string, tbIn
return nil
}
}
if model.FindIndexByColumns(tbInfo, fk.Cols...) == nil {
if model.FindIndexByColumns(tbInfo, tbInfo.Indices, fk.Cols...) == nil {
return errors.Errorf("Failed to add the foreign key constraint. Missing index for '%s' foreign key columns in the table '%s'", fk.Name, tbInfo.Name)
}
return nil
Expand Down
36 changes: 35 additions & 1 deletion ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,10 @@ func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *model.Job) (err error
case model.ActionRebaseAutoID, model.ActionModifyTableComment, model.ActionModifyTableCharsetAndCollate:
case model.ActionAddForeignKey:
fkInfo := job.Args[0].(*model.FKInfo)
info.ForeignKeys = append(info.ForeignKeys, fkInfo.Name)
info.AddForeignKeys = append(info.AddForeignKeys, model.AddForeignKeyInfo{
Name: fkInfo.Name,
Cols: fkInfo.Cols,
})
default:
return dbterror.ErrRunMultiSchemaChanges.FastGenByArgs(job.Type.String())
}
Expand Down Expand Up @@ -323,6 +326,32 @@ func checkOperateSameColAndIdx(info *model.MultiSchemaInfo) error {
return checkIndexes(info.AlterIndexes, true)
}

func checkOperateDropIndexUseByForeignKey(info *model.MultiSchemaInfo, t table.Table) error {
var remainIndexes, droppingIndexes []*model.IndexInfo
tbInfo := t.Meta()
for _, idx := range tbInfo.Indices {
dropping := false
for _, name := range info.DropIndexes {
if name.L == idx.Name.L {
dropping = true
break
}
}
if dropping {
droppingIndexes = append(droppingIndexes, idx)
} else {
remainIndexes = append(remainIndexes, idx)
}
}

for _, fk := range info.AddForeignKeys {
if droppingIdx := model.FindIndexByColumns(tbInfo, droppingIndexes, fk.Cols...); droppingIdx != nil && model.FindIndexByColumns(tbInfo, remainIndexes, fk.Cols...) == nil {
return dbterror.ErrDropIndexNeededInForeignKey.GenWithStackByArgs(droppingIdx.Name)
}
}
return nil
}

func checkMultiSchemaInfo(info *model.MultiSchemaInfo, t table.Table) error {
err := checkOperateSameColAndIdx(info)
if err != nil {
Expand All @@ -334,6 +363,11 @@ func checkMultiSchemaInfo(info *model.MultiSchemaInfo, t table.Table) error {
return err
}

err = checkOperateDropIndexUseByForeignKey(info, t)
if err != nil {
return err
}

return checkAddColumnTooManyColumns(len(t.Cols()) + len(info.AddColumns) - len(info.DropColumns))
}

Expand Down
9 changes: 8 additions & 1 deletion parser/model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,12 +316,19 @@ type MultiSchemaInfo struct {
AddIndexes []CIStr `json:"-"`
DropIndexes []CIStr `json:"-"`
AlterIndexes []CIStr `json:"-"`
ForeignKeys []CIStr `json:"-"`

AddForeignKeys []AddForeignKeyInfo `json:"-"`

RelativeColumns []CIStr `json:"-"`
PositionColumns []CIStr `json:"-"`
}

// AddForeignKeyInfo contains foreign key information.
type AddForeignKeyInfo struct {
Name CIStr
Cols []CIStr
}

// NewMultiSchemaInfo new a MultiSchemaInfo.
func NewMultiSchemaInfo() *MultiSchemaInfo {
return &MultiSchemaInfo{
Expand Down
4 changes: 2 additions & 2 deletions parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,8 @@ func FindFKInfoByName(fks []*FKInfo, name string) *FKInfo {
}

// FindIndexByColumns find IndexInfo in indices which is cover the specified columns.
func FindIndexByColumns(tbInfo *TableInfo, cols ...CIStr) *IndexInfo {
for _, index := range tbInfo.Indices {
func FindIndexByColumns(tbInfo *TableInfo, indices []*IndexInfo, cols ...CIStr) *IndexInfo {
for _, index := range indices {
if IsIndexPrefixCovered(tbInfo, index, cols...) {
return index
}
Expand Down
4 changes: 2 additions & 2 deletions planner/core/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func buildFKCheck(ctx sessionctx.Context, tbl table.Table, cols []model.CIStr, f
}
}

referTbIdxInfo := model.FindIndexByColumns(tblInfo, cols...)
referTbIdxInfo := model.FindIndexByColumns(tblInfo, tblInfo.Indices, cols...)
if referTbIdxInfo == nil {
return nil, failedErr
}
Expand Down Expand Up @@ -460,7 +460,7 @@ func buildFKCascade(ctx sessionctx.Context, tp FKCascadeType, referredFK *model.
return fkCascade, nil
}
}
indexForFK := model.FindIndexByColumns(childTable.Meta(), fk.Cols...)
indexForFK := model.FindIndexByColumns(childTable.Meta(), childTable.Meta().Indices, fk.Cols...)
if indexForFK == nil {
return nil, errors.Errorf("Missing index for '%s' foreign key columns in the table '%s'", fk.Name, childTable.Meta().Name)
}
Expand Down

0 comments on commit ad0c202

Please sign in to comment.