Skip to content

Commit

Permalink
Merge pull request pingcap#3 from zimulala/zimuxia/rename-db-4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
bb7133 authored Apr 29, 2020
2 parents 9c7f792 + 11e2b23 commit 44a44da
Show file tree
Hide file tree
Showing 12 changed files with 166 additions and 12 deletions.
22 changes: 22 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2652,6 +2652,28 @@ func (s *testDBSuite3) TestTruncateTable(c *C) {

}

func (s *testDBSuite3) TestRenameDatabase(c *C) {
s.tk = testkit.NewTestKitWithInit(c, s.store)
s.tk.MustExec("create database db charset utf8 collate utf8_general_ci;")
s.tk.MustExec("use db")
s.tk.MustExec("create table t(c1 int, c2 int)")
ctx := s.tk.Se.(sessionctx.Context)
is := domain.GetDomain(ctx).InfoSchema()
dbInfo, ok := is.SchemaByName(model.NewCIStr("db"))
c.Assert(ok, IsTrue)
oldDatabaseID := dbInfo.ID
s.tk.MustExec("rename database db to newDB")
is = domain.GetDomain(ctx).InfoSchema()
newDBInfo, ok := is.SchemaByName(model.NewCIStr("newDB"))
c.Assert(ok, IsTrue)
c.Assert(oldDatabaseID, Equals, newDBInfo.ID)

// for failure cases
s.tk.MustGetErrCode("rename database db_not_exit to db", errno.ErrBadDB)
s.tk.MustGetErrCode("rename database newDB to test", errno.ErrDBCreateExists)
s.tk.MustGetErrCode("rename database newDB to dbxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", errno.ErrTooLongIdent)
}

func (s *testDBSuite4) TestRenameTable(c *C) {
isAlterTable := false
s.testRenameTable(c, "rename table %s to %s", isAlterTable)
Expand Down
1 change: 1 addition & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ type DDL interface {
CreateSchema(ctx sessionctx.Context, name model.CIStr, charsetInfo *ast.CharsetOpt) error
AlterSchema(ctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) error
DropSchema(ctx sessionctx.Context, schema model.CIStr) error
RenameSchema(ctx sessionctx.Context, oldDB, newDB model.CIStr) error
CreateTable(ctx sessionctx.Context, stmt *ast.CreateTableStmt) error
CreateView(ctx sessionctx.Context, stmt *ast.CreateViewStmt) error
DropTable(ctx sessionctx.Context, tableIdent ast.Ident) (err error)
Expand Down
27 changes: 27 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,33 @@ func (d *ddl) AlterSchema(ctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) (
return errors.Trace(err)
}

func (d *ddl) RenameSchema(ctx sessionctx.Context, oldDB, newDB model.CIStr) error {
is := d.GetInfoSchemaWithInterceptor(ctx)
oldSchema, ok := is.SchemaByName(oldDB)
if !ok {
return errors.Trace(infoschema.ErrDatabaseNotExists)
}
_, ok = is.SchemaByName(newDB)
if ok {
return errors.Trace(infoschema.ErrDatabaseExists)
}
if err := checkTooLongSchema(newDB); err != nil {
return errors.Trace(err)
}

job := &model.Job{
SchemaID: oldSchema.ID,
SchemaName: oldSchema.Name.L,
Type: model.ActionRenameDatabase,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{newDB},
}

err := d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

func (d *ddl) DropSchema(ctx sessionctx.Context, schema model.CIStr) (err error) {
is := d.GetInfoSchemaWithInterceptor(ctx)
old, ok := is.SchemaByName(schema)
Expand Down
2 changes: 2 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = onRebaseAutoRandomType(d.store, t, job)
case model.ActionRenameTable:
ver, err = onRenameTable(d, t, job)
case model.ActionRenameDatabase:
ver, err = onRenameSchema(t, job)
case model.ActionShardRowID:
ver, err = w.onShardRowID(d, t, job)
case model.ActionModifyTableComment:
Expand Down
8 changes: 8 additions & 0 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,8 @@ func buildCancelJobTests(firstID int64) []testCancelJob {
{act: model.ActionAddPrimaryKey, jobIDs: []int64{firstID + 35}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 35)}, cancelState: model.StatePublic},
{act: model.ActionDropPrimaryKey, jobIDs: []int64{firstID + 36}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly},
{act: model.ActionDropPrimaryKey, jobIDs: []int64{firstID + 37}, cancelRetErrs: []error{admin.ErrCannotCancelDDLJob.GenWithStackByArgs(firstID + 37)}, cancelState: model.StateDeleteOnly},

{act: model.ActionRenameDatabase, jobIDs: []int64{firstID + 38}, cancelRetErrs: noErrs, cancelState: model.StateNone},
}

return tests
Expand Down Expand Up @@ -851,6 +853,12 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
testDropIndex(c, ctx, d, dbInfo, tblInfo, idxOrigName)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkDropIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, true)

// for rename database
updateTest(&tests[33])
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo1.ID, 0, model.ActionRenameDatabase, []interface{}{"newDB"}, &cancelState)
c.Check(checkErr, IsNil)
testCheckSchemaState(c, d, dbInfo, model.StatePublic)
}

func (s *testDDLSuite) TestIgnorableSpec(c *C) {
Expand Down
51 changes: 39 additions & 12 deletions ddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func onCreateSchema(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error
dbInfo.ID = schemaID
dbInfo.State = model.StateNone

err := checkSchemaNotExists(d, t, schemaID, dbInfo)
err := checkSchemaNotExists(d, t, schemaID, dbInfo.Name)
if err != nil {
if infoschema.ErrDatabaseExists.Equal(err) {
// The database already exists, can't create it, we should cancel this job now.
Expand Down Expand Up @@ -63,10 +63,10 @@ func onCreateSchema(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error
}
}

func checkSchemaNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, dbInfo *model.DBInfo) error {
func checkSchemaNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, dbName model.CIStr) error {
// d.infoHandle maybe nil in some test.
if d.infoHandle == nil {
return checkSchemaNotExistsFromStore(t, schemaID, dbInfo)
return checkSchemaNotExistsFromStore(t, schemaID, dbName)
}
// Try to use memory schema info to check first.
currVer, err := t.GetSchemaVersion()
Expand All @@ -75,35 +75,34 @@ func checkSchemaNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, dbInfo *model
}
is := d.infoHandle.Get()
if is.SchemaMetaVersion() == currVer {
return checkSchemaNotExistsFromInfoSchema(is, schemaID, dbInfo)
return checkSchemaNotExistsFromInfoSchema(is, schemaID, dbName)
}
return checkSchemaNotExistsFromStore(t, schemaID, dbInfo)
return checkSchemaNotExistsFromStore(t, schemaID, dbName)
}

func checkSchemaNotExistsFromInfoSchema(is infoschema.InfoSchema, schemaID int64, dbInfo *model.DBInfo) error {
func checkSchemaNotExistsFromInfoSchema(is infoschema.InfoSchema, schemaID int64, dbName model.CIStr) error {
// Check database exists by name.
if is.SchemaExists(dbInfo.Name) {
return infoschema.ErrDatabaseExists.GenWithStackByArgs(dbInfo.Name)
if is.SchemaExists(dbName) {
return infoschema.ErrDatabaseExists.GenWithStackByArgs(dbName)
}
// Check database exists by ID.
if _, ok := is.SchemaByID(schemaID); ok {
return infoschema.ErrDatabaseExists.GenWithStackByArgs(dbInfo.Name)
return infoschema.ErrDatabaseExists.GenWithStackByArgs(dbName)
}
return nil
}

func checkSchemaNotExistsFromStore(t *meta.Meta, schemaID int64, dbInfo *model.DBInfo) error {
func checkSchemaNotExistsFromStore(t *meta.Meta, schemaID int64, dbName model.CIStr) error {
dbs, err := t.ListDatabases()
if err != nil {
return errors.Trace(err)
}

for _, db := range dbs {
if db.Name.L == dbInfo.Name.L {
if db.Name.L == dbName.L {
if db.ID != schemaID {
return infoschema.ErrDatabaseExists.GenWithStackByArgs(db.Name)
}
dbInfo = db
}
}
return nil
Expand Down Expand Up @@ -140,6 +139,34 @@ func onModifySchemaCharsetAndCollate(t *meta.Meta, job *model.Job) (ver int64, _
return ver, nil
}

func onRenameSchema(t *meta.Meta, job *model.Job) (ver int64, _ error) {
var newDBName model.CIStr
if err := job.DecodeArgs(&newDBName); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
if err = checkSchemaNotExistsFromStore(t, job.SchemaID, newDBName); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

dbInfo.Name = newDBName
if err = t.UpdateDatabase(dbInfo); err != nil {
return ver, errors.Trace(err)
}
if ver, err = updateSchemaVersion(t, job); err != nil {
return ver, errors.Trace(err)
}
job.FinishDBJob(model.JobStateDone, model.StatePublic, ver, dbInfo)
return ver, nil
}

func onDropSchema(t *meta.Meta, job *model.Job) (ver int64, _ error) {
dbInfo, err := checkSchemaExistAndCancelNotExistJob(t, job)
if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ func (e *DDLExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
err = e.executeFlashbackTable(x)
case *ast.RenameTableStmt:
err = e.executeRenameTable(x)
case *ast.RenameDatabaseStmt:
err = e.executeRenameDatabase(x)
case *ast.TruncateTableStmt:
err = e.executeTruncateTable(x)
case *ast.LockTablesStmt:
Expand Down Expand Up @@ -152,6 +154,19 @@ func (e *DDLExec) executeTruncateTable(s *ast.TruncateTableStmt) error {
return err
}

func (e *DDLExec) executeRenameDatabase(s *ast.RenameDatabaseStmt) error {
oldDB := model.NewCIStr(s.OldDB)
newDB := model.NewCIStr(s.NewDB)

// Protect important system table from been dropped by a mistake.
// I can hardly find a case that a user really need to do this.
if oldDB.L == "mysql" {
return errors.New("Rename 'mysql' database is forbidden")
}

return domain.GetDomain(e.ctx).DDL().RenameSchema(e.ctx, oldDB, newDB)
}

func (e *DDLExec) executeRenameTable(s *ast.RenameTableStmt) error {
if len(s.TableToTables) != 1 {
// Now we only allow one schema changing at the same time.
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,5 @@ require (
)

go 1.13

replace github.com/pingcap/parser => github.com/bb7133/parser v0.0.0-20200429032743-4494e5f18a8d
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q=
github.com/appleboy/gin-jwt/v2 v2.6.3/go.mod h1:MfPYA4ogzvOcVkRwAxT7quHOtQmVKDpTwxyUrC2DNw0=
github.com/appleboy/gofight/v2 v2.1.2/go.mod h1:frW+U1QZEdDgixycTj4CygQ48yLTUhplt43+Wczp3rw=
github.com/bb7133/parser v0.0.0-20200429032743-4494e5f18a8d h1:cSjNiwPNKJfxrRP+2w6hG9mnWW9eWFYFrCcifRm1D6g=
github.com/bb7133/parser v0.0.0-20200429032743-4494e5f18a8d/go.mod h1:9v0Edh8IbgjGYW2ArJr19E+bvL8zKahsFp+ixWeId+4=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down Expand Up @@ -388,6 +390,8 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/yookoala/realpath v1.0.0/go.mod h1:gJJMA9wuX7AcqLy1+ffPatSCySA1FQ2S8Ya9AIoYBpE=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/zimulala/parser v0.0.0-20200428092446-11e0c7903208 h1:+UobXq2Tc+lu10/7YZOFdSXThYlxLc3RkH7jShkmZig=
github.com/zimulala/parser v0.0.0-20200428092446-11e0c7903208/go.mod h1:9v0Edh8IbgjGYW2ArJr19E+bvL8zKahsFp+ixWeId+4=
go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg=
Expand Down
21 changes: 21 additions & 0 deletions infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro
} else if diff.Type == model.ActionModifySchemaCharsetAndCollate {
return nil, b.applyModifySchemaCharsetAndCollate(m, diff)
}
if diff.Type == model.ActionRenameDatabase {
return nil, b.applyRenameSchema(m, diff)
}
roDBInfo, ok := b.is.SchemaByID(diff.SchemaID)
if !ok {
return nil, ErrDatabaseNotExists.GenWithStackByArgs(
Expand Down Expand Up @@ -188,6 +191,24 @@ func (b *Builder) applyModifySchemaCharsetAndCollate(m *meta.Meta, diff *model.S
return nil
}

func (b *Builder) applyRenameSchema(m *meta.Meta, diff *model.SchemaDiff) error {
di, err := m.GetDatabase(diff.SchemaID)
if err != nil {
return errors.Trace(err)
}
oldDB, ok := b.is.SchemaByID(diff.SchemaID)
if di == nil || ok {
// This should never happen.
return ErrDatabaseNotExists.GenWithStackByArgs(
fmt.Sprintf("(Schema ID %d)", diff.SchemaID),
)
}

newDbInfo := b.copySchemaTables(oldDB.Name.O)
newDbInfo.Name = di.Name
return nil
}

func (b *Builder) applyDropSchema(schemaID int64) []int64 {
di, ok := b.is.SchemaByID(schemaID)
if !ok {
Expand Down
13 changes: 13 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2666,6 +2666,19 @@ func (b *PlanBuilder) buildDDL(ctx context.Context, node ast.DDLNode) (Plan, err
}
b.visitInfo = appendVisitInfo(b.visitInfo, mysql.CreatePriv, v.Name.Schema.L,
v.Name.Name.L, "", authErr)
case *ast.RenameDatabaseStmt:
if b.ctx.GetSessionVars().User != nil {
authErr = ErrDBaccessDenied.GenWithStackByArgs(b.ctx.GetSessionVars().User.Username,
b.ctx.GetSessionVars().User.Hostname, v.OldDB)
}
b.visitInfo = appendVisitInfo(b.visitInfo, mysql.DropPriv, v.OldDB,
"", "", authErr)
if b.ctx.GetSessionVars().User != nil {
authErr = ErrDBaccessDenied.GenWithStackByArgs(b.ctx.GetSessionVars().User.Username,
b.ctx.GetSessionVars().User.Hostname, v.NewDB)
}
b.visitInfo = appendVisitInfo(b.visitInfo, mysql.CreatePriv, v.NewDB,
"", "", authErr)
case *ast.DropDatabaseStmt:
if b.ctx.GetSessionVars().User != nil {
authErr = ErrDBaccessDenied.GenWithStackByArgs(b.ctx.GetSessionVars().User.Username,
Expand Down
12 changes: 12 additions & 0 deletions planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ func (p *preprocessor) Enter(in ast.Node) (out ast.Node, skipChildren bool) {
p.checkCreateDatabaseGrammar(node)
case *ast.AlterDatabaseStmt:
p.checkAlterDatabaseGrammar(node)
case *ast.RenameDatabaseStmt:
p.checkRenameDatabaseGrammar(node)
case *ast.DropDatabaseStmt:
p.checkDropDatabaseGrammar(node)
case *ast.ShowStmt:
Expand Down Expand Up @@ -391,6 +393,16 @@ func (p *preprocessor) checkAlterDatabaseGrammar(stmt *ast.AlterDatabaseStmt) {
}
}

func (p *preprocessor) checkRenameDatabaseGrammar(stmt *ast.RenameDatabaseStmt) {
if isIncorrectName(stmt.OldDB) {
p.err = ddl.ErrWrongDBName.GenWithStackByArgs(stmt.OldDB)
return
}
if isIncorrectName(stmt.NewDB) {
p.err = ddl.ErrWrongDBName.GenWithStackByArgs(stmt.NewDB)
}
}

func (p *preprocessor) checkDropDatabaseGrammar(stmt *ast.DropDatabaseStmt) {
if isIncorrectName(stmt.Name) {
p.err = ddl.ErrWrongDBName.GenWithStackByArgs(stmt.Name)
Expand Down

0 comments on commit 44a44da

Please sign in to comment.