Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: Add pre-check of duplicate table in the downstream #55044

Merged
merged 34 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ var (
ErrRestoreIncompatibleSys = errors.Normalize("incompatible system table", errors.RFCCodeText("BR:Restore:ErrRestoreIncompatibleSys"))
ErrUnsupportedSystemTable = errors.Normalize("the system table isn't supported for restoring yet", errors.RFCCodeText("BR:Restore:ErrUnsupportedSysTable"))
ErrDatabasesAlreadyExisted = errors.Normalize("databases already existed in restored cluster", errors.RFCCodeText("BR:Restore:ErrDatabasesAlreadyExisted"))
ErrTablesAlreadyExisted = errors.Normalize("tables already existed in restored cluster", errors.RFCCodeText("BR:Restore:ErrTablesAlreadyExisted"))

// ErrStreamLogTaskExist is the error when stream log task already exists, because of supporting single task currently.
ErrStreamLogTaskExist = errors.Normalize("stream task already exists", errors.RFCCodeText("BR:Stream:ErrStreamLogTaskExist"))
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ import (
pd "github.com/tikv/pd/client"
)

type GlueClient int

const (
ClientCLP GlueClient = iota
ClientSql
)

// Glue is an abstraction of TiDB function calls used in BR.
type Glue interface {
GetDomain(store kv.Storage) (*domain.Domain, error)
Expand All @@ -36,6 +43,9 @@ type Glue interface {
// we can close domain as soon as possible.
// and we must reuse the exists session and don't close it in SQL backup job.
UseOneShotSession(store kv.Storage, closeDomain bool, fn func(se Session) error) error

// GetClient returns the client type of the glue
GetClient() GlueClient
}

// Session is an abstraction of the session.Session interface.
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue
return nil
}

func (Glue) GetClient() glue.GlueClient {
return glue.ClientCLP
}

// GetSessionCtx implements glue.Glue
func (gs *tidbSession) GetSessionCtx() sessionctx.Context {
return gs.se
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/gluetidb/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,7 @@ func (m *MockGlue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func
}
return fn(glueSession)
}

func (*MockGlue) GetClient() glue.GlueClient {
return glue.ClientCLP
}
4 changes: 4 additions & 0 deletions br/pkg/gluetikv/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,7 @@ func (Glue) GetVersion() string {
func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue.Session) error) error {
return nil
}

func (Glue) GetClient() glue.GlueClient {
return glue.ClientCLP
}
33 changes: 33 additions & 0 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand Down Expand Up @@ -179,6 +180,7 @@ func DefineRestoreCommonFlags(flags *pflag.FlagSet) {
_ = flags.MarkHidden(FlagStatsConcurrency)
_ = flags.MarkHidden(FlagBatchFlushInterval)
_ = flags.MarkHidden(FlagDdlBatchSize)
_ = flags.MarkHidden(flagUseFSR)
}

// ParseFromFlags parses the config from the flag set.
Expand Down Expand Up @@ -910,6 +912,11 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
if cfg.WithSysTable {
client.InitFullClusterRestore(cfg.ExplicitFilter)
}
} else if checkpointFirstRun && cfg.CheckRequirements {
if err := checkTableExistence(ctx, mgr, tables, g); err != nil {
schedulersRemovable = true
return errors.Trace(err)
}
}

if client.IsFullClusterRestore() && client.HasBackedUpSysDB() {
Expand Down Expand Up @@ -1315,6 +1322,10 @@ func checkDiskSpace(ctx context.Context, mgr *conn.Mgr, files []*backuppb.File,
}
return base * uint64(ratio*10) / 10
}

// The preserve rate for tikv is quite accurate, while rate for tiflash is a
// number calculated from tpcc testing with variable data sizes. 1.4 is a
// relative conservative value.
tikvUsage := preserve(EstimateTikvUsage(files, maxReplica, tikvCnt), 1.1)
tiflashUsage := preserve(EstimateTiflashUsage(tables, tiflashCnt), 1.4)
log.Info("preserved disk space", zap.Uint64("tikv", tikvUsage), zap.Uint64("tiflash", tiflashUsage))
Expand Down Expand Up @@ -1358,6 +1369,28 @@ func Exhaust(ec <-chan error) []error {
}
}

func checkTableExistence(ctx context.Context, mgr *conn.Mgr, tables []*metautil.Table, g glue.Glue) error {
// Tasks from br clp client use other checks to validate
if g.GetClient() != glue.ClientSql {
return nil
}
message := "table already exists: "
allUnique := true
for _, table := range tables {
_, err := mgr.GetDomain().InfoSchema().TableByName(ctx, table.DB.Name, table.Info.Name)
RidRisR marked this conversation as resolved.
Show resolved Hide resolved
if err == nil {
message += fmt.Sprintf("%s.%s ", table.DB.Name, table.Info.Name)
allUnique = false
} else if !infoschema.ErrTableNotExists.Equal(err) {
return errors.Trace(err)
}
}
if !allUnique {
return errors.Annotate(berrors.ErrTablesAlreadyExisted, message)
}
return nil
}

// EstimateRangeSize estimates the total range count by file.
func EstimateRangeSize(files []*backuppb.File) int {
result := 0
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,11 @@ error = '''
failed to write and ingest
'''

["BR:Restore:ErrTablesAlreadyExisted"]
error = '''
tables already existed in restored cluster
RidRisR marked this conversation as resolved.
Show resolved Hide resolved
'''

["BR:Restore:ErrUnsupportedSysTable"]
error = '''
the system table isn't supported for restoring yet
Expand Down
4 changes: 4 additions & 0 deletions pkg/executor/brie.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,10 @@ func (gs *tidbGlue) UseOneShotSession(_ kv.Storage, _ bool, fn func(se glue.Sess
return fn(glueSession)
}

func (*tidbGlue) GetClient() glue.GlueClient {
return glue.ClientSql
}

type tidbGlueSession struct {
// the session context of the brie task's subtask, such as `CREATE TABLE`.
se sessionctx.Context
Expand Down
1 change: 1 addition & 0 deletions tests/realtikvtest/brietest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_test(
"//pkg/config",
"//pkg/executor",
"//pkg/parser/mysql",
"//pkg/session",
"//pkg/sessionctx/binloginfo",
"//pkg/store/mockstore/mockcopr",
"//pkg/testkit",
Expand Down
51 changes: 51 additions & 0 deletions tests/realtikvtest/brietest/brie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package brietest

import (
"context"
"fmt"
"os"
"strings"
Expand All @@ -24,6 +25,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/executor"
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -60,6 +62,7 @@ func TestShowBackupQuery(t *testing.T) {
restoreQuery := fmt.Sprintf("RESTORE TABLE `test`.`foo` FROM 'local://%s'", sqlTmp)
tk.MustQuery(restoreQuery)
res = tk.MustQuery("show br job query 2;")
tk.MustExec("drop table foo;")
res.CheckContain(restoreQuery)
}

Expand All @@ -68,6 +71,7 @@ func TestShowBackupQueryRedact(t *testing.T) {

executor.ResetGlobalBRIEQueueForTest()
failpoint.Enable("github.com/pingcap/tidb/pkg/executor/block-on-brie", "return")
defer failpoint.Disable("github.com/pingcap/tidb/pkg/executor/block-on-brie")
ch := make(chan any)
go func() {
tk := testkit.NewTestKit(t, tk.Session().GetStore())
Expand Down Expand Up @@ -102,6 +106,7 @@ func TestCancel(t *testing.T) {
executor.ResetGlobalBRIEQueueForTest()
tk.MustExec("use test;")
failpoint.Enable("github.com/pingcap/tidb/pkg/executor/block-on-brie", "return")
defer failpoint.Disable("github.com/pingcap/tidb/pkg/executor/block-on-brie")

req := require.New(t)
ch := make(chan struct{})
Expand All @@ -126,3 +131,49 @@ func TestCancel(t *testing.T) {
req.FailNow("the backup job doesn't be canceled")
}
}

func TestExistedTables(t *testing.T) {
RidRisR marked this conversation as resolved.
Show resolved Hide resolved
tk := initTestKit(t)
tmp := makeTempDirForBackup(t)
sqlTmp := strings.ReplaceAll(tmp, "'", "''")
executor.ResetGlobalBRIEQueueForTest()
tk.MustExec("use test;")
for i := 0; i < 10; i++ {
tableName := fmt.Sprintf("foo%d", i)
tk.MustExec(fmt.Sprintf("create table %s(pk int primary key auto_increment, v varchar(255));", tableName))
tk.MustExec(fmt.Sprintf("insert into %s(v) values %s;", tableName, strings.TrimSuffix(strings.Repeat("('hello, world'),", 100), ",")))
}

done := make(chan struct{})
go func() {
defer close(done)
backupQuery := fmt.Sprintf("BACKUP DATABASE * TO 'local://%s'", sqlTmp)
_ = tk.MustQuery(backupQuery)
}()
select {
case <-time.After(20 * time.Second):
t.Fatal("Backup operation exceeded")
case <-done:
}

done = make(chan struct{})
go func() {
defer close(done)
restoreQuery := fmt.Sprintf("RESTORE DATABASE * FROM 'local://%s'", sqlTmp)
res, err := tk.Exec(restoreQuery)
require.NoError(t, err)

_, err = session.ResultSetToStringSlice(context.Background(), tk.Session(), res)
require.ErrorContains(t, err, "table already exists")
}()
select {
case <-time.After(20 * time.Second):
t.Fatal("Restore operation exceeded")
case <-done:
}

for i := 0; i < 10; i++ {
tableName := fmt.Sprintf("foo%d", i)
tk.MustExec(fmt.Sprintf("drop table %s;", tableName))
}
}