Skip to content

Commit

Permalink
br: Add pre-check of duplicate table in the downstream (#55044)
Browse files Browse the repository at this point in the history
close #55087
  • Loading branch information
RidRisR committed Aug 20, 2024
1 parent 09b85fb commit f471302
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 0 deletions.
1 change: 1 addition & 0 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ var (
ErrRestoreIncompatibleSys = errors.Normalize("incompatible system table", errors.RFCCodeText("BR:Restore:ErrRestoreIncompatibleSys"))
ErrUnsupportedSystemTable = errors.Normalize("the system table isn't supported for restoring yet", errors.RFCCodeText("BR:Restore:ErrUnsupportedSysTable"))
ErrDatabasesAlreadyExisted = errors.Normalize("databases already existed in restored cluster", errors.RFCCodeText("BR:Restore:ErrDatabasesAlreadyExisted"))
ErrTablesAlreadyExisted = errors.Normalize("tables already existed in restored cluster", errors.RFCCodeText("BR:Restore:ErrTablesAlreadyExisted"))

// ErrStreamLogTaskExist is the error when stream log task already exists, because of supporting single task currently.
ErrStreamLogTaskExist = errors.Normalize("stream task already exists", errors.RFCCodeText("BR:Stream:ErrStreamLogTaskExist"))
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ import (
pd "github.com/tikv/pd/client"
)

type GlueClient int

const (
ClientCLP GlueClient = iota
ClientSql
)

// Glue is an abstraction of TiDB function calls used in BR.
type Glue interface {
GetDomain(store kv.Storage) (*domain.Domain, error)
Expand All @@ -36,6 +43,9 @@ type Glue interface {
// we can close domain as soon as possible.
// and we must reuse the exists session and don't close it in SQL backup job.
UseOneShotSession(store kv.Storage, closeDomain bool, fn func(se Session) error) error

// GetClient returns the client type of the glue
GetClient() GlueClient
}

// Session is an abstraction of the session.Session interface.
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue
return nil
}

func (Glue) GetClient() glue.GlueClient {
return glue.ClientCLP
}

// GetSessionCtx implements glue.Glue
func (gs *tidbSession) GetSessionCtx() sessionctx.Context {
return gs.se
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/gluetidb/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,7 @@ func (m *MockGlue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func
}
return fn(glueSession)
}

func (*MockGlue) GetClient() glue.GlueClient {
return glue.ClientCLP
}
4 changes: 4 additions & 0 deletions br/pkg/gluetikv/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,7 @@ func (Glue) GetVersion() string {
func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue.Session) error) error {
return nil
}

func (Glue) GetClient() glue.GlueClient {
return glue.ClientCLP
}
33 changes: 33 additions & 0 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand Down Expand Up @@ -179,6 +180,7 @@ func DefineRestoreCommonFlags(flags *pflag.FlagSet) {
_ = flags.MarkHidden(FlagStatsConcurrency)
_ = flags.MarkHidden(FlagBatchFlushInterval)
_ = flags.MarkHidden(FlagDdlBatchSize)
_ = flags.MarkHidden(flagUseFSR)
}

// ParseFromFlags parses the config from the flag set.
Expand Down Expand Up @@ -910,6 +912,11 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
if cfg.WithSysTable {
client.InitFullClusterRestore(cfg.ExplicitFilter)
}
} else if checkpointFirstRun && cfg.CheckRequirements {
if err := checkTableExistence(ctx, mgr, tables, g); err != nil {
schedulersRemovable = true
return errors.Trace(err)
}
}

if client.IsFullClusterRestore() && client.HasBackedUpSysDB() {
Expand Down Expand Up @@ -1315,6 +1322,10 @@ func checkDiskSpace(ctx context.Context, mgr *conn.Mgr, files []*backuppb.File,
}
return base * uint64(ratio*10) / 10
}

// The preserve rate for tikv is quite accurate, while rate for tiflash is a
// number calculated from tpcc testing with variable data sizes. 1.4 is a
// relative conservative value.
tikvUsage := preserve(EstimateTikvUsage(files, maxReplica, tikvCnt), 1.1)
tiflashUsage := preserve(EstimateTiflashUsage(tables, tiflashCnt), 1.4)
log.Info("preserved disk space", zap.Uint64("tikv", tikvUsage), zap.Uint64("tiflash", tiflashUsage))
Expand Down Expand Up @@ -1358,6 +1369,28 @@ func Exhaust(ec <-chan error) []error {
}
}

func checkTableExistence(ctx context.Context, mgr *conn.Mgr, tables []*metautil.Table, g glue.Glue) error {
// Tasks from br clp client use other checks to validate
if g.GetClient() != glue.ClientSql {
return nil
}
message := "table already exists: "
allUnique := true
for _, table := range tables {
_, err := mgr.GetDomain().InfoSchema().TableByName(ctx, table.DB.Name, table.Info.Name)
if err == nil {
message += fmt.Sprintf("%s.%s ", table.DB.Name, table.Info.Name)
allUnique = false
} else if !infoschema.ErrTableNotExists.Equal(err) {
return errors.Trace(err)
}
}
if !allUnique {
return errors.Annotate(berrors.ErrTablesAlreadyExisted, message)
}
return nil
}

// EstimateRangeSize estimates the total range count by file.
func EstimateRangeSize(files []*backuppb.File) int {
result := 0
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,11 @@ error = '''
failed to write and ingest
'''

["BR:Restore:ErrTablesAlreadyExisted"]
error = '''
tables already existed in restored cluster
'''

["BR:Restore:ErrUnsupportedSysTable"]
error = '''
the system table isn't supported for restoring yet
Expand Down
4 changes: 4 additions & 0 deletions pkg/executor/brie.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,10 @@ func (gs *tidbGlue) UseOneShotSession(_ kv.Storage, _ bool, fn func(se glue.Sess
return fn(glueSession)
}

func (*tidbGlue) GetClient() glue.GlueClient {
return glue.ClientSql
}

type tidbGlueSession struct {
// the session context of the brie task's subtask, such as `CREATE TABLE`.
se sessionctx.Context
Expand Down
1 change: 1 addition & 0 deletions tests/realtikvtest/brietest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_test(
"//pkg/config",
"//pkg/executor",
"//pkg/parser/mysql",
"//pkg/session",
"//pkg/sessionctx/binloginfo",
"//pkg/store/mockstore/mockcopr",
"//pkg/testkit",
Expand Down
51 changes: 51 additions & 0 deletions tests/realtikvtest/brietest/brie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package brietest

import (
"context"
"fmt"
"os"
"strings"
Expand All @@ -24,6 +25,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/executor"
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -60,6 +62,7 @@ func TestShowBackupQuery(t *testing.T) {
restoreQuery := fmt.Sprintf("RESTORE TABLE `test`.`foo` FROM 'local://%s'", sqlTmp)
tk.MustQuery(restoreQuery)
res = tk.MustQuery("show br job query 2;")
tk.MustExec("drop table foo;")
res.CheckContain(restoreQuery)
}

Expand All @@ -68,6 +71,7 @@ func TestShowBackupQueryRedact(t *testing.T) {

executor.ResetGlobalBRIEQueueForTest()
failpoint.Enable("github.com/pingcap/tidb/pkg/executor/block-on-brie", "return")
defer failpoint.Disable("github.com/pingcap/tidb/pkg/executor/block-on-brie")
ch := make(chan any)
go func() {
tk := testkit.NewTestKit(t, tk.Session().GetStore())
Expand Down Expand Up @@ -102,6 +106,7 @@ func TestCancel(t *testing.T) {
executor.ResetGlobalBRIEQueueForTest()
tk.MustExec("use test;")
failpoint.Enable("github.com/pingcap/tidb/pkg/executor/block-on-brie", "return")
defer failpoint.Disable("github.com/pingcap/tidb/pkg/executor/block-on-brie")

req := require.New(t)
ch := make(chan struct{})
Expand All @@ -126,3 +131,49 @@ func TestCancel(t *testing.T) {
req.FailNow("the backup job doesn't be canceled")
}
}

func TestExistedTables(t *testing.T) {
tk := initTestKit(t)
tmp := makeTempDirForBackup(t)
sqlTmp := strings.ReplaceAll(tmp, "'", "''")
executor.ResetGlobalBRIEQueueForTest()
tk.MustExec("use test;")
for i := 0; i < 10; i++ {
tableName := fmt.Sprintf("foo%d", i)
tk.MustExec(fmt.Sprintf("create table %s(pk int primary key auto_increment, v varchar(255));", tableName))
tk.MustExec(fmt.Sprintf("insert into %s(v) values %s;", tableName, strings.TrimSuffix(strings.Repeat("('hello, world'),", 100), ",")))
}

done := make(chan struct{})
go func() {
defer close(done)
backupQuery := fmt.Sprintf("BACKUP DATABASE * TO 'local://%s'", sqlTmp)
_ = tk.MustQuery(backupQuery)
}()
select {
case <-time.After(20 * time.Second):
t.Fatal("Backup operation exceeded")
case <-done:
}

done = make(chan struct{})
go func() {
defer close(done)
restoreQuery := fmt.Sprintf("RESTORE DATABASE * FROM 'local://%s'", sqlTmp)
res, err := tk.Exec(restoreQuery)
require.NoError(t, err)

_, err = session.ResultSetToStringSlice(context.Background(), tk.Session(), res)
require.ErrorContains(t, err, "table already exists")
}()
select {
case <-time.After(20 * time.Second):
t.Fatal("Restore operation exceeded")
case <-done:
}

for i := 0; i < 10; i++ {
tableName := fmt.Sprintf("foo%d", i)
tk.MustExec(fmt.Sprintf("drop table %s;", tableName))
}
}

0 comments on commit f471302

Please sign in to comment.