Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: error if the log restore has no full backup schema or id maps #54421

Merged
merged 7 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 63 additions & 41 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"crypto/tls"
"fmt"
"math"
"os"
"slices"
"strconv"
"strings"
Expand Down Expand Up @@ -637,15 +638,71 @@ type FullBackupStorageConfig struct {

type InitSchemaConfig struct {
// required
IsNewTask bool
HasFullRestore bool
TableFilter filter.Filter
IsNewTask bool
TableFilter filter.Filter

// optional
TiFlashRecorder *tiflashrec.TiFlashRecorder
FullBackupStorage *FullBackupStorageConfig
}

const UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL = "UNSAFE_PITR_LOG_RESTORE_START_BEFORE_ANY_UPSTREAM_USER_DDL"

func (rc *LogClient) generateDBReplacesFromFullBackupStorage(
ctx context.Context,
cfg *InitSchemaConfig,
) (map[stream.UpstreamID]*stream.DBReplace, error) {
dbReplaces := make(map[stream.UpstreamID]*stream.DBReplace)
if cfg.FullBackupStorage == nil {
envVal, ok := os.LookupEnv(UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL)
if ok && len(envVal) > 0 {
log.Info(fmt.Sprintf("the environment variable %s is active, skip loading the base schemas.", UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL))
return dbReplaces, nil
}
return nil, errors.Errorf("miss upstream table information at `start-ts`(%d) but the full backup path is not specified", rc.startTS)
}
s, err := storage.New(ctx, cfg.FullBackupStorage.Backend, cfg.FullBackupStorage.Opts)
if err != nil {
return nil, errors.Trace(err)
}
fullBackupTables, err := initFullBackupTables(ctx, s, cfg.TableFilter)
if err != nil {
return nil, errors.Trace(err)
}
for _, t := range fullBackupTables {
dbName, _ := utils.GetSysDBCIStrName(t.DB.Name)
newDBInfo, exist := rc.dom.InfoSchema().SchemaByName(dbName)
if !exist {
log.Info("db not existed", zap.String("dbname", dbName.String()))
continue
}

dbReplace, exist := dbReplaces[t.DB.ID]
if !exist {
dbReplace = stream.NewDBReplace(t.DB.Name.O, newDBInfo.ID)
dbReplaces[t.DB.ID] = dbReplace
}

if t.Info == nil {
// If the db is empty, skip it.
continue
}
newTableInfo, err := restore.GetTableSchema(rc.GetDomain(), dbName, t.Info.Name)
if err != nil {
log.Info("table not existed", zap.String("tablename", dbName.String()+"."+t.Info.Name.String()))
continue
}

dbReplace.TableMap[t.Info.ID] = &stream.TableReplace{
Name: newTableInfo.Name.O,
TableID: newTableInfo.ID,
PartitionMap: restoreutils.GetPartitionIDMap(newTableInfo, t.Info),
IndexMap: restoreutils.GetIndexIDMap(newTableInfo, t.Info),
}
}
return dbReplaces, nil
}

// InitSchemasReplaceForDDL gets schemas information Mapping from old schemas to new schemas.
// It is used to rewrite meta kv-event.
func (rc *LogClient) InitSchemasReplaceForDDL(
Expand All @@ -658,7 +715,7 @@ func (rc *LogClient) InitSchemasReplaceForDDL(
// the id map doesn't need to construct only when it is not the first execution
needConstructIdMap bool

dbReplaces = make(map[stream.UpstreamID]*stream.DBReplace)
dbReplaces map[stream.UpstreamID]*stream.DBReplace
)

// not new task, load schemas map from external storage
Expand All @@ -673,7 +730,7 @@ func (rc *LogClient) InitSchemasReplaceForDDL(

// a new task, but without full snapshot restore, tries to load
// schemas map whose `restore-ts`` is the task's `start-ts`.
if len(dbMaps) <= 0 && !cfg.HasFullRestore {
if len(dbMaps) <= 0 && cfg.FullBackupStorage == nil {
log.Info("try to load pitr id maps of the previous task", zap.Uint64("start-ts", rc.startTS))
needConstructIdMap = true
dbMaps, err = rc.initSchemasMap(ctx, rc.GetClusterID(ctx), rc.startTS)
Expand All @@ -695,45 +752,10 @@ func (rc *LogClient) InitSchemasReplaceForDDL(
if len(dbMaps) <= 0 {
log.Info("no id maps, build the table replaces from cluster and full backup schemas")
needConstructIdMap = true
s, err := storage.New(ctx, cfg.FullBackupStorage.Backend, cfg.FullBackupStorage.Opts)
dbReplaces, err = rc.generateDBReplacesFromFullBackupStorage(ctx, cfg)
if err != nil {
return nil, errors.Trace(err)
}
fullBackupTables, err := initFullBackupTables(ctx, s, cfg.TableFilter)
if err != nil {
return nil, errors.Trace(err)
}
for _, t := range fullBackupTables {
dbName, _ := utils.GetSysDBCIStrName(t.DB.Name)
newDBInfo, exist := rc.dom.InfoSchema().SchemaByName(dbName)
if !exist {
log.Info("db not existed", zap.String("dbname", dbName.String()))
continue
}

dbReplace, exist := dbReplaces[t.DB.ID]
if !exist {
dbReplace = stream.NewDBReplace(t.DB.Name.O, newDBInfo.ID)
dbReplaces[t.DB.ID] = dbReplace
}

if t.Info == nil {
// If the db is empty, skip it.
continue
}
newTableInfo, err := restore.GetTableSchema(rc.GetDomain(), dbName, t.Info.Name)
if err != nil {
log.Info("table not existed", zap.String("tablename", dbName.String()+"."+t.Info.Name.String()))
continue
}

dbReplace.TableMap[t.Info.ID] = &stream.TableReplace{
Name: newTableInfo.Name.O,
TableID: newTableInfo.ID,
PartitionMap: restoreutils.GetPartitionIDMap(newTableInfo, t.Info),
IndexMap: restoreutils.GetIndexIDMap(newTableInfo, t.Info),
}
}
} else {
dbReplaces = stream.FromSchemaMaps(dbMaps)
}
Expand Down
23 changes: 4 additions & 19 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,18 +435,6 @@ func (s *streamMgr) backupFullSchemas(ctx context.Context) error {
m.ClusterVersion = clusterVersion
})

schemas := backup.NewBackupSchemas(func(storage kv.Storage, fn func(*model.DBInfo, *model.TableInfo)) error {
return backup.BuildFullSchema(storage, s.cfg.StartTS, func(dbInfo *model.DBInfo, tableInfo *model.TableInfo) {
fn(dbInfo, tableInfo)
})
}, 0)

err = schemas.BackupSchemas(ctx, metaWriter, nil, s.mgr.GetStorage(), nil,
s.cfg.StartTS, backup.DefaultSchemaConcurrency, 0, true, nil)
if err != nil {
return errors.Trace(err)
}

if err = metaWriter.FlushBackupMeta(ctx); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1364,7 +1352,6 @@ func restoreStream(
// get the schemas ID replace information.
schemasReplace, err := client.InitSchemasReplaceForDDL(ctx, &logclient.InitSchemaConfig{
IsNewTask: newTask,
HasFullRestore: len(cfg.FullBackupStorage) > 0,
TableFilter: cfg.TableFilter,
TiFlashRecorder: cfg.tiflashRecorder,
FullBackupStorage: fullBackupStorage,
Expand Down Expand Up @@ -1686,13 +1673,11 @@ func getFullBackupTS(
func parseFullBackupTablesStorage(
cfg *RestoreConfig,
) (*logclient.FullBackupStorageConfig, error) {
var storageName string
if len(cfg.FullBackupStorage) > 0 {
storageName = cfg.FullBackupStorage
} else {
storageName = cfg.Storage
if len(cfg.FullBackupStorage) == 0 {
log.Info("the full backup path is not specified, so BR will try to get id maps")
return nil, nil
}
u, err := storage.ParseBackend(storageName, &cfg.BackendOptions)
u, err := storage.ParseBackend(cfg.FullBackupStorage, &cfg.BackendOptions)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down