Skip to content

Commit

Permalink
ddl: fix alter table charset bug and compatibility (#9790)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Apr 4, 2019
1 parent 62b209c commit b2c7f08
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 38 deletions.
111 changes: 106 additions & 5 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,7 @@ func (s *testIntegrationSuite) TestChangingTableCharset(c *C) {
if rs != nil {
rs.Close()
}
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[parser:1115]Unknown character set: 'gbk'")
rs, err = tk.Exec("alter table t charset utf8")
if rs != nil {
Expand All @@ -600,6 +601,107 @@ func (s *testIntegrationSuite) TestChangingTableCharset(c *C) {

rs, err = tk.Exec("alter table t charset utf8mb4 collate utf8mb4_bin")
c.Assert(err, NotNil)

rs, err = tk.Exec("alter table t charset ''")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[parser:1115]Unknown character set: ''")

rs, err = tk.Exec("alter table t collate ''")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:1273]Unknown collation: ''")

rs, err = tk.Exec("alter table t charset utf8mb4 collate '' collate utf8mb4_bin;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:1273]Unknown collation: ''")

rs, err = tk.Exec("alter table t charset latin1 charset utf8 charset utf8mb4 collate utf8_bin;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:1302]Conflicting declarations: 'CHARACTER SET latin1' and 'CHARACTER SET utf8'")

rs, err = tk.Exec("alter table t charset utf8 collate utf8mb4_bin;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:1253]COLLATION 'utf8mb4_bin' is not valid for CHARACTER SET 'utf8'")

rs, err = tk.Exec("alter table t charset utf8 collate utf8_bin collate utf8mb4_bin collate utf8_bin;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:1253]COLLATION 'utf8mb4_bin' is not valid for CHARACTER SET 'utf8'")

// Test change column charset when changing table charset.
tk.MustExec("drop table t;")
tk.MustExec("create table t(a varchar(10)) charset utf8")
tk.MustExec("alter table t convert to charset utf8mb4;")
checkCharset := func() {
tbl := testGetTableByName(c, s.ctx, "test", "t")
c.Assert(tbl, NotNil)
c.Assert(tbl.Meta().Charset, Equals, charset.CharsetUTF8MB4)
c.Assert(tbl.Meta().Collate, Equals, charset.CollationUTF8MB4)
for _, col := range tbl.Meta().Columns {
c.Assert(col.Charset, Equals, charset.CharsetUTF8MB4)
c.Assert(col.Collate, Equals, charset.CollationUTF8MB4)
}
}
checkCharset()

// Test when column charset can not convert to the target charset.
tk.MustExec("drop table t;")
tk.MustExec("create table t(a varchar(10) character set ascii) charset utf8mb4")
_, err = tk.Exec("alter table t convert to charset utf8mb4;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:210]unsupported modify charset from ascii to utf8mb4")

// Test when table charset is equal to target charset but column charset is not equal.
tk.MustExec("drop table t;")
tk.MustExec("create table t(a varchar(10) character set utf8) charset utf8mb4")
tk.MustExec("alter table t convert to charset utf8mb4;")
checkCharset()

// Mock table info with charset is "". Old TiDB maybe create table with charset is "".
db, ok := domain.GetDomain(s.ctx).InfoSchema().SchemaByName(model.NewCIStr("test"))
c.Assert(ok, IsTrue)
tbl := testGetTableByName(c, s.ctx, "test", "t")
tblInfo := tbl.Meta().Clone()
tblInfo.Charset = ""
tblInfo.Collate = ""
updateTableInfo := func(tblInfo *model.TableInfo) {
mockCtx := mock.NewContext()
mockCtx.Store = s.store
err = mockCtx.NewTxn(context.Background())
c.Assert(err, IsNil)
txn, err := mockCtx.Txn(true)
c.Assert(err, IsNil)
mt := meta.NewMeta(txn)

err = mt.UpdateTable(db.ID, tblInfo)
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
}
updateTableInfo(tblInfo)

// check table charset is ""
tk.MustExec("alter table t add column b varchar(10);") // load latest schema.
tbl = testGetTableByName(c, s.ctx, "test", "t")
c.Assert(tbl, NotNil)
c.Assert(tbl.Meta().Charset, Equals, "")
c.Assert(tbl.Meta().Collate, Equals, "")
// Test when table charset is "", this for compatibility.
tk.MustExec("alter table t convert to charset utf8mb4;")
checkCharset()

// Test when column charset is "".
tbl = testGetTableByName(c, s.ctx, "test", "t")
tblInfo = tbl.Meta().Clone()
tblInfo.Columns[0].Charset = ""
tblInfo.Columns[0].Collate = ""
updateTableInfo(tblInfo)
// check table charset is ""
tk.MustExec("alter table t drop column b;") // load latest schema.
tbl = testGetTableByName(c, s.ctx, "test", "t")
c.Assert(tbl, NotNil)
c.Assert(tbl.Meta().Columns[0].Charset, Equals, "")
c.Assert(tbl.Meta().Columns[0].Collate, Equals, "")
tk.MustExec("alter table t convert to charset utf8mb4;")
checkCharset()
}

func (s *testIntegrationSuite) TestCaseInsensitiveCharsetAndCollate(c *C) {
Expand Down Expand Up @@ -1419,15 +1521,14 @@ func (s *testIntegrationSuite) TestTreatOldVersionUTF8AsUTF8MB4(c *C) {

// Test for alter table convert charset
config.GetGlobalConfig().TreatOldVersionUTF8AsUTF8MB4 = true
s.tk.MustExec("alter table t change column b b varchar(40) character set ascii") // reload schema.
s.tk.MustExec("alter table t drop column b") // reload schema.
s.tk.MustExec("alter table t convert to charset utf8mb4;")

config.GetGlobalConfig().TreatOldVersionUTF8AsUTF8MB4 = false
s.tk.MustExec("alter table t change column b b varchar(50) character set ascii") // reload schema.
// TODO: fix this after PR 9790.
s.tk.MustExec("alter table t add column b varchar(50);") // reload schema.
s.tk.MustQuery("show create table t").Check(testkit.Rows("t CREATE TABLE `t` (\n" +
" `a` varchar(20) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,\n" +
" `b` varchar(50) CHARACTER SET ascii COLLATE ascii_bin DEFAULT NULL\n" +
" `a` varchar(20) DEFAULT NULL,\n" +
" `b` varchar(50) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
}

Expand Down
12 changes: 12 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ var (
ErrWrongNameForIndex = terror.ClassDDL.New(codeWrongNameForIndex, mysql.MySQLErrName[mysql.ErrWrongNameForIndex])
// ErrUnknownCharacterSet returns unknown character set.
ErrUnknownCharacterSet = terror.ClassDDL.New(codeUnknownCharacterSet, "Unknown character set: '%s'")
// ErrUnknownCollation returns unknown collation.
ErrUnknownCollation = terror.ClassDDL.New(codeUnknownCollation, "Unknown collation: '%s'")
// ErrCollationCharsetMismatch returns when collation not match the charset.
ErrCollationCharsetMismatch = terror.ClassDDL.New(codeCollationCharsetMismatch, mysql.MySQLErrName[mysql.ErrCollationCharsetMismatch])
// ErrConflictingDeclarations return conflict declarations.
ErrConflictingDeclarations = terror.ClassDDL.New(codeConflictingDeclarations, "Conflicting declarations: 'CHARACTER SET %s' and 'CHARACTER SET %s'")
// ErrPrimaryCantHaveNull returns All parts of a PRIMARY KEY must be NOT NULL; if you need NULL in a key, use UNIQUE instead
ErrPrimaryCantHaveNull = terror.ClassDDL.New(codePrimaryCantHaveNull, mysql.MySQLErrName[mysql.ErrPrimaryCantHaveNull])

Expand Down Expand Up @@ -688,6 +694,9 @@ const (
codeWrongNameForIndex = terror.ErrCode(mysql.ErrWrongNameForIndex)
codeErrTooLongIndexComment = terror.ErrCode(mysql.ErrTooLongIndexComment)
codeUnknownCharacterSet = terror.ErrCode(mysql.ErrUnknownCharacterSet)
codeUnknownCollation = terror.ErrCode(mysql.ErrUnknownCollation)
codeCollationCharsetMismatch = terror.ErrCode(mysql.ErrCollationCharsetMismatch)
codeConflictingDeclarations = terror.ErrCode(mysql.ErrConflictingDeclarations)
codeCantCreateTable = terror.ErrCode(mysql.ErrCantCreateTable)
codeTableMustHaveColumns = terror.ErrCode(mysql.ErrTableMustHaveColumns)
codePartitionsMustBeDefined = terror.ErrCode(mysql.ErrPartitionsMustBeDefined)
Expand Down Expand Up @@ -747,6 +756,9 @@ func init() {
codeErrTooLongIndexComment: mysql.ErrTooLongIndexComment,
codeViewWrongList: mysql.ErrViewWrongList,
codeUnknownCharacterSet: mysql.ErrUnknownCharacterSet,
codeUnknownCollation: mysql.ErrUnknownCollation,
codeCollationCharsetMismatch: mysql.ErrCollationCharsetMismatch,
codeConflictingDeclarations: mysql.ErrConflictingDeclarations,
codePartitionsMustBeDefined: mysql.ErrPartitionsMustBeDefined,
codePartitionMgmtOnNonpartitioned: mysql.ErrPartitionMgmtOnNonpartitioned,
codeDropPartitionNonExistent: mysql.ErrDropPartitionNonExistent,
Expand Down
132 changes: 102 additions & 30 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,18 +224,27 @@ func ResolveCharsetCollation(tblCharset, dbCharset string) (string, string, erro
return charset, collate, nil
}

func typesNeedCharset(tp byte) bool {
switch tp {
case mysql.TypeString, mysql.TypeVarchar, mysql.TypeVarString,
mysql.TypeBlob, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob,
mysql.TypeEnum, mysql.TypeSet:
return true
}
return false
}

func setCharsetCollationFlenDecimal(tp *types.FieldType, tblCharset string, dbCharset string) error {
tp.Charset = strings.ToLower(tp.Charset)
tp.Collate = strings.ToLower(tp.Collate)
if len(tp.Charset) == 0 {
switch tp.Tp {
case mysql.TypeString, mysql.TypeVarchar, mysql.TypeVarString, mysql.TypeBlob, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeEnum, mysql.TypeSet:
if typesNeedCharset(tp.Tp) {
var err error
tp.Charset, tp.Collate, err = ResolveCharsetCollation(tblCharset, dbCharset)
if err != nil {
return errors.Trace(err)
}
default:
} else {
tp.Charset = charset.CharsetBin
tp.Collate = charset.CharsetBin
}
Expand Down Expand Up @@ -1579,9 +1588,9 @@ func isIgnorableSpec(tp ast.AlterTableType) bool {
// getCharsetAndCollateInTableOption will iterate the charset and collate in the options,
// and returns the last charset and collate in options. If there is no charset in the options,
// the returns charset will be "", the same as collate.
func getCharsetAndCollateInTableOption(startIdx int, options []*ast.TableOption) (charset, collate string) {
charsets := make([]string, len(options))
collates := make([]string, len(options))
func getCharsetAndCollateInTableOption(startIdx int, options []*ast.TableOption) (ca, co string, err error) {
charsets := make([]string, 0, len(options))
collates := make([]string, 0, len(options))
for i := startIdx; i < len(options); i++ {
opt := options[i]
// we set the charset to the last option. example: alter table t charset latin1 charset utf8 collate utf8_bin;
Expand All @@ -1594,12 +1603,25 @@ func getCharsetAndCollateInTableOption(startIdx int, options []*ast.TableOption)
}
}

if len(charsets) != 0 {
charset = charsets[len(charsets)-1]
if len(charsets) > 1 {
return "", "", ErrConflictingDeclarations.GenWithStackByArgs(charsets[0], charsets[1])
}
if len(charsets) == 1 {
if charsets[0] == "" {
return "", "", ErrUnknownCharacterSet.GenWithStackByArgs("")
}
ca = charsets[0]
}

if len(collates) != 0 {
collate = collates[len(collates)-1]
for i := range collates {
if collates[i] == "" {
return "", "", ErrUnknownCollation.GenWithStackByArgs("")
}
if len(ca) != 0 && !charset.ValidCharsetAndCollation(ca, collates[i]) {
return "", "", ErrCollationCharsetMismatch.GenWithStackByArgs(collates[i], ca)
}
}
co = collates[len(collates)-1]
}
return
}
Expand Down Expand Up @@ -1725,7 +1747,11 @@ func (d *ddl) AlterTable(ctx sessionctx.Context, ident ast.Ident, specs []*ast.A
if handledCharsetOrCollate {
continue
}
toCharset, toCollate := getCharsetAndCollateInTableOption(i, spec.Options)
var toCharset, toCollate string
toCharset, toCollate, err = getCharsetAndCollateInTableOption(i, spec.Options)
if err != nil {
return err
}
err = d.AlterTableCharsetAndCollate(ctx, ident, toCharset, toCollate)
handledCharsetOrCollate = true
}
Expand Down Expand Up @@ -2092,7 +2118,7 @@ func (d *ddl) DropColumn(ctx sessionctx.Context, ti ast.Ident, colName model.CIS
// modifiableCharsetAndCollation returns error when the charset or collation is not modifiable.
func modifiableCharsetAndCollation(toCharset, toCollate, origCharset, origCollate string) error {
if !charset.ValidCharsetAndCollation(toCharset, toCollate) {
return ErrUnknownCharacterSet.GenWithStackByArgs(toCharset, toCollate)
return ErrUnknownCharacterSet.GenWithStack("Unknown character set: '%s', collation: '%s'", toCharset, toCollate)
}
if toCharset == charset.CharsetUTF8MB4 && origCharset == charset.CharsetUTF8 {
// TiDB only allow utf8 to be changed to utf8mb4.
Expand Down Expand Up @@ -2542,11 +2568,9 @@ func (d *ddl) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast.Iden
return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ident.Schema, ident.Name))
}

origCharset := tb.Meta().Charset
origCollate := tb.Meta().Collate
if toCharset == "" {
// charset does not change.
toCharset = origCharset
toCharset = tb.Meta().Charset
}

if toCollate == "" {
Expand All @@ -2556,24 +2580,14 @@ func (d *ddl) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast.Iden
return errors.Trace(err)
}
}
// Old version schema charset maybe modified when load schema if TreatOldVersionUTF8AsUTF8MB4 was enable.
// So even if the origCharset equal toCharset, we still need to do the ddl for old version schema.
if origCharset == toCharset && origCollate == toCollate && tb.Meta().Version >= model.TableInfoVersion2 {
// nothing to do.
return nil
doNothing, err := checkAlterTableCharset(tb.Meta(), schema, toCharset, toCollate)
if err != nil {
return err
}

if err = modifiableCharsetAndCollation(toCharset, toCollate, origCharset, origCollate); err != nil {
return errors.Trace(err)
if doNothing {
return nil
}

for _, col := range tb.Meta().Cols() {
if col.Tp == mysql.TypeVarchar {
if err = IsTooBigFieldLength(col.Flen, col.Name.O, toCharset); err != nil {
return errors.Trace(err)
}
}
}
job := &model.Job{
SchemaID: schema.ID,
TableID: tb.Meta().ID,
Expand All @@ -2586,6 +2600,64 @@ func (d *ddl) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast.Iden
return errors.Trace(err)
}

// checkAlterTableCharset uses to check is it possible to change the charset of table.
// This function returns 2 variable:
// doNothing: if doNothing is true, means no need to change any more, because the target charset is same with the charset of table.
// err: if err is not nil, means it is not possible to change table charset to target charset.
func checkAlterTableCharset(tblInfo *model.TableInfo, dbInfo *model.DBInfo, toCharset, toCollate string) (doNothing bool, err error) {
origCharset := tblInfo.Charset
origCollate := tblInfo.Collate
// Old version schema charset maybe modified when load schema if TreatOldVersionUTF8AsUTF8MB4 was enable.
// So even if the origCharset equal toCharset, we still need to do the ddl for old version schema.
if origCharset == toCharset && origCollate == toCollate && tblInfo.Version >= model.TableInfoVersion2 {
// nothing to do.
doNothing = true
for _, col := range tblInfo.Columns {
if col.Charset == charset.CharsetBin {
continue
}
if col.Charset == toCharset && col.Collate == toCollate {
continue
}
doNothing = false
}
if doNothing {
return doNothing, nil
}
}

if len(origCharset) == 0 {
// The table charset may be "", if the table is create in old TiDB version, such as v2.0.8.
// This DDL will update the table charset to default charset.
origCharset, origCollate, err = ResolveCharsetCollation("", dbInfo.Charset)
if err != nil {
return doNothing, err
}
}

if err = modifiableCharsetAndCollation(toCharset, toCollate, origCharset, origCollate); err != nil {
return doNothing, err
}

for _, col := range tblInfo.Columns {
if col.Tp == mysql.TypeVarchar {
if err = IsTooBigFieldLength(col.Flen, col.Name.O, toCharset); err != nil {
return doNothing, err
}
}
if col.Charset == charset.CharsetBin {
continue
}
if len(col.Charset) == 0 {
continue
}
if err = modifiableCharsetAndCollation(toCharset, toCollate, col.Charset, col.Collate); err != nil {
return doNothing, err
}
}
return doNothing, nil
}

// RenameIndex renames an index.
// In TiDB, indexes are case-insensitive (so index 'a' and 'A" are considered the same index),
// but index names are case-sensitive (we can rename index 'a' to 'A')
Expand Down
2 changes: 1 addition & 1 deletion ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func rollingbackDropTablePartition(t *meta.Meta, job *model.Job) (ver int64, err
}

func rollingbackDropSchema(t *meta.Meta, job *model.Job) error {
dbInfo, err := checkDropSchema(t, job)
dbInfo, err := checkSchemaExistAndCancelNotExistJob(t, job)
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions ddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func onCreateSchema(t *meta.Meta, job *model.Job) (ver int64, _ error) {
}

func onDropSchema(t *meta.Meta, job *model.Job) (ver int64, _ error) {
dbInfo, err := checkDropSchema(t, job)
dbInfo, err := checkSchemaExistAndCancelNotExistJob(t, job)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -120,7 +120,7 @@ func onDropSchema(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, errors.Trace(err)
}

func checkDropSchema(t *meta.Meta, job *model.Job) (*model.DBInfo, error) {
func checkSchemaExistAndCancelNotExistJob(t *meta.Meta, job *model.Job) (*model.DBInfo, error) {
dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
return nil, errors.Trace(err)
Expand Down
Loading

0 comments on commit b2c7f08

Please sign in to comment.