Skip to content

Commit

Permalink
ddl: fix secondary index len check for clustered index (#23710)
Browse files Browse the repository at this point in the history
  • Loading branch information
lysu authored Apr 1, 2021
1 parent a8a7c01 commit 91bcb21
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 39 deletions.
80 changes: 80 additions & 0 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1510,6 +1510,86 @@ func (s *testIntegrationSuite8) TestCreateTooManyIndexes(c *C) {
tk.MustGetErrCode(alterSQL, errno.ErrTooManyKeys)
}

func (s *testIntegrationSuite8) TestCreateSecondaryIndexInCluster(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")

// test create table with non-unique key
tk.MustGetErrCode(`
CREATE TABLE t (
c01 varchar(255) NOT NULL,
c02 varchar(255) NOT NULL,
c03 varchar(255) NOT NULL,
c04 varchar(255) DEFAULT NULL,
c05 varchar(255) DEFAULT NULL,
c06 varchar(255) DEFAULT NULL,
PRIMARY KEY (c01,c02,c03) clustered,
KEY c04 (c04)
)`, errno.ErrTooLongKey)

// test create long clustered primary key.
tk.MustGetErrCode(`
CREATE TABLE t (
c01 varchar(255) NOT NULL,
c02 varchar(255) NOT NULL,
c03 varchar(255) NOT NULL,
c04 varchar(255) NOT NULL,
c05 varchar(255) DEFAULT NULL,
c06 varchar(255) DEFAULT NULL,
PRIMARY KEY (c01,c02,c03,c04) clustered
)`, errno.ErrTooLongKey)

// test create table with unique key
tk.MustExec(`
CREATE TABLE t (
c01 varchar(255) NOT NULL,
c02 varchar(255) NOT NULL,
c03 varchar(255) NOT NULL,
c04 varchar(255) DEFAULT NULL,
c05 varchar(255) DEFAULT NULL,
c06 varchar(255) DEFAULT NULL,
PRIMARY KEY (c01,c02,c03) clustered,
unique key c04 (c04)
)`)
tk.MustExec("drop table t")

// test create index
tk.MustExec(`
CREATE TABLE t (
c01 varchar(255) NOT NULL,
c02 varchar(255) NOT NULL,
c03 varchar(255) NOT NULL,
c04 varchar(255) DEFAULT NULL,
c05 varchar(255) DEFAULT NULL,
c06 varchar(255) DEFAULT NULL,
PRIMARY KEY (c01,c02) clustered
)`)
tk.MustExec("create index idx1 on t(c03)")
tk.MustGetErrCode("create index idx2 on t(c03, c04)", errno.ErrTooLongKey)
tk.MustExec("create unique index uk2 on t(c03, c04)")
tk.MustExec("drop table t")

// test change/modify column
tk.MustExec(`
CREATE TABLE t (
c01 varchar(255) NOT NULL,
c02 varchar(255) NOT NULL,
c03 varchar(255) NOT NULL,
c04 varchar(255) DEFAULT NULL,
c05 varchar(255) DEFAULT NULL,
c06 varchar(255) DEFAULT NULL,
Index idx1(c03),
PRIMARY KEY (c01,c02) clustered,
unique index uk1(c06)
)`)
tk.MustExec("alter table t change c03 c10 varchar(256) default null")
tk.MustGetErrCode("alter table t change c10 c100 varchar(1024) default null", errno.ErrTooLongKey)
tk.MustGetErrCode("alter table t modify c10 varchar(600) default null", errno.ErrTooLongKey)
tk.MustExec("alter table t modify c06 varchar(600) default null")
tk.MustGetErrCode("alter table t modify c01 varchar(510)", errno.ErrTooLongKey)
tk.MustExec("create table t2 like t")
}

func (s *testIntegrationSuite3) TestAlterColumn(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test_db")
Expand Down
129 changes: 109 additions & 20 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1505,6 +1505,44 @@ func buildTableInfo(
idxInfo.ID = allocateIndexID(tbInfo)
tbInfo.Indices = append(tbInfo.Indices, idxInfo)
}
if tbInfo.IsCommonHandle {
// Ensure tblInfo's each non-unique secondary-index's len + primary-key's len <= MaxIndexLength for clustered index table.
var pkLen, idxLen int
pkLen, err = indexColumnsLen(tbInfo.Columns, tables.FindPrimaryIndex(tbInfo).Columns)
if err != nil {
return
}
for _, idx := range tbInfo.Indices {
if idx.Unique {
// Only need check for non-unique secondary-index.
continue
}
idxLen, err = indexColumnsLen(tbInfo.Columns, idx.Columns)
if err != nil {
return
}
if pkLen+idxLen > config.GetGlobalConfig().MaxIndexLength {
return nil, errTooLongKey.GenWithStackByArgs(config.GetGlobalConfig().MaxIndexLength)
}
}
}
return
}

func indexColumnsLen(cols []*model.ColumnInfo, idxCols []*model.IndexColumn) (len int, err error) {
for _, idxCol := range idxCols {
col := model.FindColumnInfo(cols, idxCol.Name.L)
if col == nil {
err = errKeyColumnDoesNotExits.GenWithStack("column does not exist: %s", idxCol.Name.L)
return
}
var colLen int
colLen, err = getIndexColumnLength(col, idxCol.Length)
if err != nil {
return
}
len += colLen
}
return
}

Expand Down Expand Up @@ -3888,39 +3926,74 @@ func (d *ddl) getModifiableColumnJob(ctx sessionctx.Context, ident ast.Ident, or
// checkColumnWithIndexConstraint is used to check the related index constraint of the modified column.
// Index has a max-prefix-length constraint. eg: a varchar(100), index idx(a), modifying column a to a varchar(4000)
// will cause index idx to break the max-prefix-length constraint.
//
// For clustered index:
// Change column in pk need recheck all non-unique index, new pk len + index len < maxIndexLength.
// Change column in secondary only need related index, pk len + new index len < maxIndexLength.
func checkColumnWithIndexConstraint(tbInfo *model.TableInfo, originalCol, newCol *model.ColumnInfo) error {
var columns []*model.ColumnInfo
for _, indexInfo := range tbInfo.Indices {
containColumn := false
columns := make([]*model.ColumnInfo, 0, len(tbInfo.Columns))
columns = append(columns, tbInfo.Columns...)
// Replace old column with new column.
for i, col := range columns {
if col.Name.L != originalCol.Name.L {
continue
}
columns[i] = newCol.Clone()
columns[i].Name = originalCol.Name
break
}

pkIndex := tables.FindPrimaryIndex(tbInfo)
var clusteredPkLen int
if tbInfo.IsCommonHandle {
var err error
clusteredPkLen, err = indexColumnsLen(columns, pkIndex.Columns)
if err != nil {
return err
}
}

checkOneIndex := func(indexInfo *model.IndexInfo, pkLenAppendToKey int, skipCheckIfNotModify bool) (modified bool, err error) {
for _, col := range indexInfo.Columns {
if col.Name.L == originalCol.Name.L {
containColumn = true
modified = true
break
}
}
if !containColumn {
continue
if skipCheckIfNotModify && !modified {
return
}
if columns == nil {
columns = make([]*model.ColumnInfo, 0, len(tbInfo.Columns))
columns = append(columns, tbInfo.Columns...)
// replace old column with new column.
for i, col := range columns {
if col.Name.L != originalCol.Name.L {
continue
}
columns[i] = newCol.Clone()
columns[i].Name = originalCol.Name
break
}
err = checkIndexInModifiableColumns(columns, indexInfo.Columns)
if err != nil {
return
}
err = checkIndexPrefixLength(columns, indexInfo.Columns, pkLenAppendToKey)
return
}

err := checkIndexInModifiableColumns(columns, indexInfo.Columns)
// Check primary key first and get "does primary key's column has be modified?" info.
var (
pkModified bool
err error
)
if pkIndex != nil {
pkModified, err = checkOneIndex(pkIndex, 0, true)
if err != nil {
return err
}
}

// Check secondary indexes.
for _, indexInfo := range tbInfo.Indices {
if indexInfo.Primary {
continue
}
var pkLenAppendToKey int
if !indexInfo.Unique {
pkLenAppendToKey = clusteredPkLen
}

err = checkIndexPrefixLength(columns, indexInfo.Columns)
_, err = checkOneIndex(indexInfo, pkLenAppendToKey, !tbInfo.IsCommonHandle || !pkModified)
if err != nil {
return err
}
Expand Down Expand Up @@ -5013,6 +5086,22 @@ func (d *ddl) CreateIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast.Inde
return errors.Trace(err)
}

if !unique && tblInfo.IsCommonHandle {
// Ensure new created non-unique secondary-index's len + primary-key's len <= MaxIndexLength in clustered index table.
var pkLen, idxLen int
pkLen, err = indexColumnsLen(tblInfo.Columns, tables.FindPrimaryIndex(tblInfo).Columns)
if err != nil {
return err
}
idxLen, err = indexColumnsLen(tblInfo.Columns, indexColumns)
if err != nil {
return err
}
if pkLen+idxLen > config.GetGlobalConfig().MaxIndexLength {
return errTooLongKey.GenWithStackByArgs(config.GetGlobalConfig().MaxIndexLength)
}
}

global := false
if unique && tblInfo.GetPartitionInfo() != nil {
ck, err := checkPartitionKeysConstraint(tblInfo.GetPartitionInfo(), indexColumns, tblInfo)
Expand Down
26 changes: 7 additions & 19 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,24 +109,13 @@ func checkPKOnGeneratedColumn(tblInfo *model.TableInfo, indexPartSpecifications
return lastCol, nil
}

func checkIndexPrefixLength(columns []*model.ColumnInfo, idxColumns []*model.IndexColumn) error {
// The sum of length of all index columns.
sumLength := 0
for _, ic := range idxColumns {
col := model.FindColumnInfo(columns, ic.Name.L)
if col == nil {
return errKeyColumnDoesNotExits.GenWithStack("column does not exist: %s", ic.Name)
}

indexColumnLength, err := getIndexColumnLength(col, ic.Length)
if err != nil {
return err
}
sumLength += indexColumnLength
// The sum of all lengths must be shorter than the max length for prefix.
if sumLength > config.GetGlobalConfig().MaxIndexLength {
return errTooLongKey.GenWithStackByArgs(config.GetGlobalConfig().MaxIndexLength)
}
func checkIndexPrefixLength(columns []*model.ColumnInfo, idxColumns []*model.IndexColumn, pkLenAppendToKey int) error {
idxLen, err := indexColumnsLen(columns, idxColumns)
if err != nil {
return err
}
if idxLen+pkLenAppendToKey > config.GetGlobalConfig().MaxIndexLength {
return errTooLongKey.GenWithStackByArgs(config.GetGlobalConfig().MaxIndexLength)
}
return nil
}
Expand Down Expand Up @@ -489,7 +478,6 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
indexInfo.Global = global
indexInfo.ID = allocateIndexID(tblInfo)
tblInfo.Indices = append(tblInfo.Indices, indexInfo)

if err = checkTooManyIndexes(tblInfo.Indices); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down

0 comments on commit 91bcb21

Please sign in to comment.