From f47130249bb2b7b841ff9427275ed46e7f77d36b Mon Sep 17 00:00:00 2001 From: ris <79858083+RidRisR@users.noreply.github.com> Date: Tue, 20 Aug 2024 13:37:42 +0800 Subject: [PATCH] br: Add pre-check of duplicate table in the downstream (#55044) close pingcap/tidb#55087 --- br/pkg/errors/errors.go | 1 + br/pkg/glue/glue.go | 10 +++++ br/pkg/gluetidb/glue.go | 4 ++ br/pkg/gluetidb/mock/mock.go | 4 ++ br/pkg/gluetikv/glue.go | 4 ++ br/pkg/task/restore.go | 33 +++++++++++++++ errors.toml | 5 +++ pkg/executor/brie.go | 4 ++ tests/realtikvtest/brietest/BUILD.bazel | 1 + tests/realtikvtest/brietest/brie_test.go | 51 ++++++++++++++++++++++++ 10 files changed, 117 insertions(+) diff --git a/br/pkg/errors/errors.go b/br/pkg/errors/errors.go index 1b78a861fa668..3bd2ab776ccb3 100644 --- a/br/pkg/errors/errors.go +++ b/br/pkg/errors/errors.go @@ -70,6 +70,7 @@ var ( ErrRestoreIncompatibleSys = errors.Normalize("incompatible system table", errors.RFCCodeText("BR:Restore:ErrRestoreIncompatibleSys")) ErrUnsupportedSystemTable = errors.Normalize("the system table isn't supported for restoring yet", errors.RFCCodeText("BR:Restore:ErrUnsupportedSysTable")) ErrDatabasesAlreadyExisted = errors.Normalize("databases already existed in restored cluster", errors.RFCCodeText("BR:Restore:ErrDatabasesAlreadyExisted")) + ErrTablesAlreadyExisted = errors.Normalize("tables already existed in restored cluster", errors.RFCCodeText("BR:Restore:ErrTablesAlreadyExisted")) // ErrStreamLogTaskExist is the error when stream log task already exists, because of supporting single task currently. ErrStreamLogTaskExist = errors.Normalize("stream task already exists", errors.RFCCodeText("BR:Stream:ErrStreamLogTaskExist")) diff --git a/br/pkg/glue/glue.go b/br/pkg/glue/glue.go index 5d5f611fa39e6..1895ee092cc78 100644 --- a/br/pkg/glue/glue.go +++ b/br/pkg/glue/glue.go @@ -13,6 +13,13 @@ import ( pd "github.com/tikv/pd/client" ) +type GlueClient int + +const ( + ClientCLP GlueClient = iota + ClientSql +) + // Glue is an abstraction of TiDB function calls used in BR. type Glue interface { GetDomain(store kv.Storage) (*domain.Domain, error) @@ -36,6 +43,9 @@ type Glue interface { // we can close domain as soon as possible. // and we must reuse the exists session and don't close it in SQL backup job. UseOneShotSession(store kv.Storage, closeDomain bool, fn func(se Session) error) error + + // GetClient returns the client type of the glue + GetClient() GlueClient } // Session is an abstraction of the session.Session interface. diff --git a/br/pkg/gluetidb/glue.go b/br/pkg/gluetidb/glue.go index d7d5c907fddff..7963c14b0f5bb 100644 --- a/br/pkg/gluetidb/glue.go +++ b/br/pkg/gluetidb/glue.go @@ -183,6 +183,10 @@ func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue return nil } +func (Glue) GetClient() glue.GlueClient { + return glue.ClientCLP +} + // GetSessionCtx implements glue.Glue func (gs *tidbSession) GetSessionCtx() sessionctx.Context { return gs.se diff --git a/br/pkg/gluetidb/mock/mock.go b/br/pkg/gluetidb/mock/mock.go index 42cbd4a814657..dc54f48ebda3a 100644 --- a/br/pkg/gluetidb/mock/mock.go +++ b/br/pkg/gluetidb/mock/mock.go @@ -159,3 +159,7 @@ func (m *MockGlue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func } return fn(glueSession) } + +func (*MockGlue) GetClient() glue.GlueClient { + return glue.ClientCLP +} diff --git a/br/pkg/gluetikv/glue.go b/br/pkg/gluetikv/glue.go index ad3ae045f478f..2fd990e92d2a5 100644 --- a/br/pkg/gluetikv/glue.go +++ b/br/pkg/gluetikv/glue.go @@ -73,3 +73,7 @@ func (Glue) GetVersion() string { func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue.Session) error) error { return nil } + +func (Glue) GetClient() glue.GlueClient { + return glue.ClientCLP +} diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 8bc6383be78b6..3c59650dcd7f0 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/tidb/br/pkg/version" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/domain" + "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/tablecodec" @@ -179,6 +180,7 @@ func DefineRestoreCommonFlags(flags *pflag.FlagSet) { _ = flags.MarkHidden(FlagStatsConcurrency) _ = flags.MarkHidden(FlagBatchFlushInterval) _ = flags.MarkHidden(FlagDdlBatchSize) + _ = flags.MarkHidden(flagUseFSR) } // ParseFromFlags parses the config from the flag set. @@ -910,6 +912,11 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf if cfg.WithSysTable { client.InitFullClusterRestore(cfg.ExplicitFilter) } + } else if checkpointFirstRun && cfg.CheckRequirements { + if err := checkTableExistence(ctx, mgr, tables, g); err != nil { + schedulersRemovable = true + return errors.Trace(err) + } } if client.IsFullClusterRestore() && client.HasBackedUpSysDB() { @@ -1315,6 +1322,10 @@ func checkDiskSpace(ctx context.Context, mgr *conn.Mgr, files []*backuppb.File, } return base * uint64(ratio*10) / 10 } + + // The preserve rate for tikv is quite accurate, while rate for tiflash is a + // number calculated from tpcc testing with variable data sizes. 1.4 is a + // relative conservative value. tikvUsage := preserve(EstimateTikvUsage(files, maxReplica, tikvCnt), 1.1) tiflashUsage := preserve(EstimateTiflashUsage(tables, tiflashCnt), 1.4) log.Info("preserved disk space", zap.Uint64("tikv", tikvUsage), zap.Uint64("tiflash", tiflashUsage)) @@ -1358,6 +1369,28 @@ func Exhaust(ec <-chan error) []error { } } +func checkTableExistence(ctx context.Context, mgr *conn.Mgr, tables []*metautil.Table, g glue.Glue) error { + // Tasks from br clp client use other checks to validate + if g.GetClient() != glue.ClientSql { + return nil + } + message := "table already exists: " + allUnique := true + for _, table := range tables { + _, err := mgr.GetDomain().InfoSchema().TableByName(ctx, table.DB.Name, table.Info.Name) + if err == nil { + message += fmt.Sprintf("%s.%s ", table.DB.Name, table.Info.Name) + allUnique = false + } else if !infoschema.ErrTableNotExists.Equal(err) { + return errors.Trace(err) + } + } + if !allUnique { + return errors.Annotate(berrors.ErrTablesAlreadyExisted, message) + } + return nil +} + // EstimateRangeSize estimates the total range count by file. func EstimateRangeSize(files []*backuppb.File) int { result := 0 diff --git a/errors.toml b/errors.toml index bde9ae2e18350..9c02f10d6ac04 100644 --- a/errors.toml +++ b/errors.toml @@ -291,6 +291,11 @@ error = ''' failed to write and ingest ''' +["BR:Restore:ErrTablesAlreadyExisted"] +error = ''' +tables already existed in restored cluster +''' + ["BR:Restore:ErrUnsupportedSysTable"] error = ''' the system table isn't supported for restoring yet diff --git a/pkg/executor/brie.go b/pkg/executor/brie.go index 130f627bc6609..dd019e30b67b3 100644 --- a/pkg/executor/brie.go +++ b/pkg/executor/brie.go @@ -753,6 +753,10 @@ func (gs *tidbGlue) UseOneShotSession(_ kv.Storage, _ bool, fn func(se glue.Sess return fn(glueSession) } +func (*tidbGlue) GetClient() glue.GlueClient { + return glue.ClientSql +} + type tidbGlueSession struct { // the session context of the brie task's subtask, such as `CREATE TABLE`. se sessionctx.Context diff --git a/tests/realtikvtest/brietest/BUILD.bazel b/tests/realtikvtest/brietest/BUILD.bazel index f1a20bff34dff..70f784fb3d07e 100644 --- a/tests/realtikvtest/brietest/BUILD.bazel +++ b/tests/realtikvtest/brietest/BUILD.bazel @@ -18,6 +18,7 @@ go_test( "//pkg/config", "//pkg/executor", "//pkg/parser/mysql", + "//pkg/session", "//pkg/sessionctx/binloginfo", "//pkg/store/mockstore/mockcopr", "//pkg/testkit", diff --git a/tests/realtikvtest/brietest/brie_test.go b/tests/realtikvtest/brietest/brie_test.go index c72b164b66058..e1c5abd475a54 100644 --- a/tests/realtikvtest/brietest/brie_test.go +++ b/tests/realtikvtest/brietest/brie_test.go @@ -15,6 +15,7 @@ package brietest import ( + "context" "fmt" "os" "strings" @@ -24,6 +25,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/executor" + "github.com/pingcap/tidb/pkg/session" "github.com/pingcap/tidb/pkg/testkit" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" @@ -60,6 +62,7 @@ func TestShowBackupQuery(t *testing.T) { restoreQuery := fmt.Sprintf("RESTORE TABLE `test`.`foo` FROM 'local://%s'", sqlTmp) tk.MustQuery(restoreQuery) res = tk.MustQuery("show br job query 2;") + tk.MustExec("drop table foo;") res.CheckContain(restoreQuery) } @@ -68,6 +71,7 @@ func TestShowBackupQueryRedact(t *testing.T) { executor.ResetGlobalBRIEQueueForTest() failpoint.Enable("github.com/pingcap/tidb/pkg/executor/block-on-brie", "return") + defer failpoint.Disable("github.com/pingcap/tidb/pkg/executor/block-on-brie") ch := make(chan any) go func() { tk := testkit.NewTestKit(t, tk.Session().GetStore()) @@ -102,6 +106,7 @@ func TestCancel(t *testing.T) { executor.ResetGlobalBRIEQueueForTest() tk.MustExec("use test;") failpoint.Enable("github.com/pingcap/tidb/pkg/executor/block-on-brie", "return") + defer failpoint.Disable("github.com/pingcap/tidb/pkg/executor/block-on-brie") req := require.New(t) ch := make(chan struct{}) @@ -126,3 +131,49 @@ func TestCancel(t *testing.T) { req.FailNow("the backup job doesn't be canceled") } } + +func TestExistedTables(t *testing.T) { + tk := initTestKit(t) + tmp := makeTempDirForBackup(t) + sqlTmp := strings.ReplaceAll(tmp, "'", "''") + executor.ResetGlobalBRIEQueueForTest() + tk.MustExec("use test;") + for i := 0; i < 10; i++ { + tableName := fmt.Sprintf("foo%d", i) + tk.MustExec(fmt.Sprintf("create table %s(pk int primary key auto_increment, v varchar(255));", tableName)) + tk.MustExec(fmt.Sprintf("insert into %s(v) values %s;", tableName, strings.TrimSuffix(strings.Repeat("('hello, world'),", 100), ","))) + } + + done := make(chan struct{}) + go func() { + defer close(done) + backupQuery := fmt.Sprintf("BACKUP DATABASE * TO 'local://%s'", sqlTmp) + _ = tk.MustQuery(backupQuery) + }() + select { + case <-time.After(20 * time.Second): + t.Fatal("Backup operation exceeded") + case <-done: + } + + done = make(chan struct{}) + go func() { + defer close(done) + restoreQuery := fmt.Sprintf("RESTORE DATABASE * FROM 'local://%s'", sqlTmp) + res, err := tk.Exec(restoreQuery) + require.NoError(t, err) + + _, err = session.ResultSetToStringSlice(context.Background(), tk.Session(), res) + require.ErrorContains(t, err, "table already exists") + }() + select { + case <-time.After(20 * time.Second): + t.Fatal("Restore operation exceeded") + case <-done: + } + + for i := 0; i < 10; i++ { + tableName := fmt.Sprintf("foo%d", i) + tk.MustExec(fmt.Sprintf("drop table %s;", tableName)) + } +}