Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into fix-36888
Browse files Browse the repository at this point in the history
Signed-off-by: YangKeao <[email protected]>
  • Loading branch information
YangKeao committed Jan 10, 2023
2 parents b18668d + c4d8ed1 commit 0c72d55
Show file tree
Hide file tree
Showing 56 changed files with 3,234 additions and 1,093 deletions.
1 change: 1 addition & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ go_test(
"search_test.go",
"split_test.go",
"stream_metas_test.go",
"systable_restore_test.go",
"util_test.go",
],
embed = [":restore"],
Expand Down
92 changes: 92 additions & 0 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2646,6 +2646,74 @@ func (rc *Client) SetWithSysTable(withSysTable bool) {
rc.withSysTable = withSysTable
}

func (rc *Client) ResetTiFlashReplicas(ctx context.Context, g glue.Glue, storage kv.Storage) error {
dom, err := g.GetDomain(storage)
if err != nil {
return errors.Trace(err)
}
info := dom.InfoSchema()
allSchema := info.AllSchemas()
recorder := tiflashrec.New()

expectTiFlashStoreCount := uint64(0)
needTiFlash := false
for _, s := range allSchema {
for _, t := range s.Tables {
if t.TiFlashReplica != nil {
expectTiFlashStoreCount = mathutil.Max(expectTiFlashStoreCount, t.TiFlashReplica.Count)
recorder.AddTable(t.ID, *t.TiFlashReplica)
needTiFlash = true
}
}
}
if !needTiFlash {
log.Info("no need to set tiflash replica, since there is no tables enable tiflash replica")
return nil
}
// we wait for ten minutes to wait tiflash starts.
// since tiflash only starts when set unmark recovery mode finished.
timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
err = utils.WithRetry(timeoutCtx, func() error {
tiFlashStoreCount, err := rc.getTiFlashNodeCount(ctx)
log.Info("get tiflash store count for resetting TiFlash Replica",
zap.Uint64("count", tiFlashStoreCount))
if err != nil {
return errors.Trace(err)
}
if tiFlashStoreCount < expectTiFlashStoreCount {
log.Info("still waiting for enough tiflash store start",
zap.Uint64("expect", expectTiFlashStoreCount),
zap.Uint64("actual", tiFlashStoreCount),
)
return errors.New("tiflash store count is less than expected")
}
return nil
}, &waitTiFlashBackoffer{
Attempts: 30,
BaseBackoff: 4 * time.Second,
})
if err != nil {
return err
}

sqls := recorder.GenerateResetAlterTableDDLs(info)
log.Info("Generating SQLs for resetting tiflash replica",
zap.Strings("sqls", sqls))

return g.UseOneShotSession(storage, false, func(se glue.Session) error {
for _, sql := range sqls {
if errExec := se.ExecuteInternal(ctx, sql); errExec != nil {
logutil.WarnTerm("Failed to restore tiflash replica config, you may execute the sql restore it manually.",
logutil.ShortError(errExec),
zap.String("sql", sql),
)
}
}
return nil
})
}

// MockClient create a fake client used to test.
func MockClient(dbs map[string]*utils.Database) *Client {
return &Client{databases: dbs}
Expand Down Expand Up @@ -2721,3 +2789,27 @@ func CheckNewCollationEnable(
log.Info("set new_collation_enabled", zap.Bool("new_collation_enabled", enabled))
return nil
}

type waitTiFlashBackoffer struct {
Attempts int
BaseBackoff time.Duration
}

// NextBackoff returns a duration to wait before retrying again
func (b *waitTiFlashBackoffer) NextBackoff(error) time.Duration {
bo := b.BaseBackoff
b.Attempts--
if b.Attempts == 0 {
return 0
}
b.BaseBackoff *= 2
if b.BaseBackoff > 32*time.Second {
b.BaseBackoff = 32 * time.Second
}
return bo
}

// Attempt returns the remain attempt times
func (b *waitTiFlashBackoffer) Attempt() int {
return b.Attempts
}
103 changes: 90 additions & 13 deletions br/pkg/restore/systable_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ import (
"go.uber.org/zap"
)

const sysUserTableName = "user"
const (
rootUser = "root"
sysUserTableName = "user"
cloudAdminUser = "cloud_admin"
)

var statsTables = map[string]struct{}{
"stats_buckets": {},
Expand Down Expand Up @@ -51,14 +55,14 @@ var unRecoverableTable = map[string]struct{}{
// skip clearing or restoring 'cloud_admin'@'%' which is a special
// user on TiDB Cloud
var sysPrivilegeTableMap = map[string]string{
"user": "not (user = 'cloud_admin' and host = '%')", // since v1.0.0
"db": "not (user = 'cloud_admin' and host = '%')", // since v1.0.0
"tables_priv": "not (user = 'cloud_admin' and host = '%')", // since v1.0.0
"columns_priv": "not (user = 'cloud_admin' and host = '%')", // since v1.0.0
"default_roles": "not (user = 'cloud_admin' and host = '%')", // since v3.0.0
"role_edges": "not (to_user = 'cloud_admin' and to_host = '%')", // since v3.0.0
"global_priv": "not (user = 'cloud_admin' and host = '%')", // since v3.0.8
"global_grants": "not (user = 'cloud_admin' and host = '%')", // since v5.0.3
"user": "(user = '%s' and host = '%%')", // since v1.0.0
"db": "(user = '%s' and host = '%%')", // since v1.0.0
"tables_priv": "(user = '%s' and host = '%%')", // since v1.0.0
"columns_priv": "(user = '%s' and host = '%%')", // since v1.0.0
"default_roles": "(user = '%s' and host = '%%')", // since v3.0.0
"role_edges": "(to_user = '%s' and to_host = '%%')", // since v3.0.0
"global_priv": "(user = '%s' and host = '%%')", // since v3.0.8
"global_grants": "(user = '%s' and host = '%%')", // since v5.0.3
}

func isUnrecoverableTable(tableName string) bool {
Expand All @@ -71,6 +75,78 @@ func isStatsTable(tableName string) bool {
return ok
}

func generateResetSQLs(db *database, resetUsers []string) []string {
if db.Name.L != mysql.SystemDB {
return nil
}
sqls := make([]string, 0, 10)
// we only need reset root password once
rootReset := false
for tableName := range db.ExistingTables {
if sysPrivilegeTableMap[tableName] != "" {
for _, name := range resetUsers {
if strings.ToLower(name) == rootUser {
if !rootReset {
updateSQL := fmt.Sprintf("UPDATE %s.%s SET authentication_string='',"+
" Shutdown_priv='Y',"+
" Config_priv='Y'"+
" WHERE USER='root' AND Host='%%';",
db.Name.L, sysUserTableName)
sqls = append(sqls, updateSQL)
rootReset = true
} else {
continue
}
} else {
/* #nosec G202: SQL string concatenation */
whereClause := fmt.Sprintf("WHERE "+sysPrivilegeTableMap[tableName], name)
deleteSQL := fmt.Sprintf("DELETE FROM %s %s;",
utils.EncloseDBAndTable(db.Name.L, tableName), whereClause)
sqls = append(sqls, deleteSQL)
}
}
}
}
return sqls
}

// ClearSystemUsers is used for volume-snapshot restoration.
// because we can not support restore user in some scenarios, for example in cloud.
// we'd better use this function to drop cloud_admin user after volume-snapshot restore.
func (rc *Client) ClearSystemUsers(ctx context.Context, resetUsers []string) error {
sysDB := mysql.SystemDB
db, ok := rc.getDatabaseByName(sysDB)
if !ok {
log.Warn("target database not exist, aborting", zap.String("database", sysDB))
return nil
}
execSQL := func(sql string) error {
// SQLs here only contain table name and database name, seems it is no need to redact them.
if err := rc.db.se.Execute(ctx, sql); err != nil {
log.Warn("failed to clear system users",
zap.Stringer("database", db.Name),
zap.String("sql", sql),
zap.Error(err),
)
return berrors.ErrUnknown.Wrap(err).GenWithStack("failed to execute %s", sql)
}
log.Info("successfully clear system users after restoration",
zap.Stringer("database", db.Name),
zap.String("sql", sql),
)
return nil
}

sqls := generateResetSQLs(db, resetUsers)
for _, sql := range sqls {
log.Info("reset system user for cloud", zap.String("sql", sql))
if err := execSQL(sql); err != nil {
return err
}
}
return nil
}

// RestoreSystemSchemas restores the system schema(i.e. the `mysql` schema).
// Detail see https://github.com/pingcap/br/issues/679#issuecomment-762592254.
func (rc *Client) RestoreSystemSchemas(ctx context.Context, f filter.Filter) {
Expand Down Expand Up @@ -203,14 +279,15 @@ func (rc *Client) replaceTemporaryTableToSystable(ctx context.Context, ti *model
}

if db.ExistingTables[tableName] != nil {
whereClause := ""
whereNotClause := ""
if rc.fullClusterRestore && sysPrivilegeTableMap[tableName] != "" {
// cloud_admin is a special user on tidb cloud, need to skip it.
whereClause = fmt.Sprintf("WHERE %s", sysPrivilegeTableMap[tableName])
/* #nosec G202: SQL string concatenation */
whereNotClause = fmt.Sprintf("WHERE NOT "+sysPrivilegeTableMap[tableName], cloudAdminUser)
log.Info("full cluster restore, delete existing data",
zap.String("table", tableName), zap.Stringer("schema", db.Name))
deleteSQL := fmt.Sprintf("DELETE FROM %s %s;",
utils.EncloseDBAndTable(db.Name.L, tableName), whereClause)
utils.EncloseDBAndTable(db.Name.L, tableName), whereNotClause)
if err := execSQL(deleteSQL); err != nil {
return err
}
Expand All @@ -228,7 +305,7 @@ func (rc *Client) replaceTemporaryTableToSystable(ctx context.Context, ti *model
utils.EncloseDBAndTable(db.Name.L, tableName),
colListStr, colListStr,
utils.EncloseDBAndTable(db.TemporaryName.L, tableName),
whereClause)
whereNotClause)
return execSQL(replaceIntoSQL)
}

Expand Down
72 changes: 72 additions & 0 deletions br/pkg/restore/systable_restore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package restore

import (
"regexp"
"testing"

"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/parser/model"
"github.com/stretchr/testify/require"
)

func testTableInfo(name string) *model.TableInfo {
return &model.TableInfo{
Name: model.NewCIStr(name),
}
}

func TestGenerateResetSQL(t *testing.T) {
// case #1: ignore non-mysql databases
mockDB := &database{
ExistingTables: map[string]*model.TableInfo{},
Name: model.NewCIStr("non-mysql"),
TemporaryName: utils.TemporaryDBName("non-mysql"),
}
for name := range sysPrivilegeTableMap {
mockDB.ExistingTables[name] = testTableInfo(name)
}
resetUsers := []string{"cloud_admin", "root"}
require.Equal(t, 0, len(generateResetSQLs(mockDB, resetUsers)))

// case #2: ignore non expected table
mockDB = &database{
ExistingTables: map[string]*model.TableInfo{},
Name: model.NewCIStr("mysql"),
TemporaryName: utils.TemporaryDBName("mysql"),
}
for name := range sysPrivilegeTableMap {
name += "non_available"
mockDB.ExistingTables[name] = testTableInfo(name)
}
resetUsers = []string{"cloud_admin", "root"}
require.Equal(t, 0, len(generateResetSQLs(mockDB, resetUsers)))

// case #3: only reset cloud admin account
for name := range sysPrivilegeTableMap {
mockDB.ExistingTables[name] = testTableInfo(name)
}
resetUsers = []string{"cloud_admin"}
sqls := generateResetSQLs(mockDB, resetUsers)
require.Equal(t, 8, len(sqls))
for _, sql := range sqls {
// for cloud_admin we only generate DELETE sql
require.Regexp(t, regexp.MustCompile("DELETE*"), sql)
}

// case #4: reset cloud admin/other account
resetUsers = []string{"cloud_admin", "cloud_other"}
sqls = generateResetSQLs(mockDB, resetUsers)
require.Equal(t, 16, len(sqls))
for _, sql := range sqls {
// for cloud_admin/cloud_other we only generate DELETE sql
require.Regexp(t, regexp.MustCompile("DELETE*"), sql)
}

// case #5: reset cloud admin && root account
resetUsers = []string{"cloud_admin", "root"}
sqls = generateResetSQLs(mockDB, resetUsers)
// 8 DELETE sqls for cloud admin and 1 UPDATE sql for root
require.Equal(t, 9, len(sqls))
}
Loading

0 comments on commit 0c72d55

Please sign in to comment.