Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log-backup: add flag for restore point #39431

Merged
merged 4 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion br/cmd/br/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,5 @@ func newStreamRestoreCommand() *cobra.Command {
}
task.DefineFilterFlags(command, filterOutSysAndMemTables, true)
task.DefineStreamRestoreFlags(command)
command.Hidden = true
return command
}
50 changes: 31 additions & 19 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,19 @@ const (
// FlagStreamFullBackupStorage is used for log restore, represents the full backup storage.
FlagStreamFullBackupStorage = "full-backup-storage"
// FlagPiTRBatchCount and FlagPiTRBatchSize are used for restore log with batch method.
FlagPiTRBatchCount = "pitr-batch-count"
FlagPiTRBatchSize = "pitr-batch-size"

defaultPiTRBatchCount = 16
defaultPiTRBatchSize = 32 * 1024 * 1024
defaultRestoreConcurrency = 128
defaultRestoreStreamConcurrency = 128
maxRestoreBatchSizeLimit = 10240
defaultPDConcurrency = 1
defaultBatchFlushInterval = 16 * time.Second
defaultFlagDdlBatchSize = 128
resetSpeedLimitRetryTimes = 3
FlagPiTRBatchCount = "pitr-batch-count"
FlagPiTRBatchSize = "pitr-batch-size"
FlagPiTRConcurrency = "pitr-concurrency"

defaultPiTRBatchCount = 8
defaultPiTRBatchSize = 16 * 1024 * 1024
defaultRestoreConcurrency = 128
defaultPiTRConcurrency = 16
maxRestoreBatchSizeLimit = 10240
defaultPDConcurrency = 1
defaultBatchFlushInterval = 16 * time.Second
defaultFlagDdlBatchSize = 128
resetSpeedLimitRetryTimes = 3
)

const (
Expand Down Expand Up @@ -175,6 +176,7 @@ type RestoreConfig struct {
tiflashRecorder *tiflashrec.TiFlashRecorder `json:"-" toml:"-"`
PitrBatchCount uint32 `json:"pitr-batch-count" toml:"pitr-batch-count"`
PitrBatchSize uint32 `json:"pitr-batch-size" toml:"pitr-batch-size"`
PitrConcurrency uint32 `json:"-" toml:"-"`

// for ebs-based restore
FullBackupType FullBackupType `json:"full-backup-type" toml:"full-backup-type"`
Expand Down Expand Up @@ -206,10 +208,9 @@ func DefineStreamRestoreFlags(command *cobra.Command) {
"support TSO or datetime, e.g. '400036290571534337' or '2018-05-11 01:42:23+0800'")
command.Flags().String(FlagStreamFullBackupStorage, "", "specify the backup full storage. "+
"fill it if want restore full backup before restore log.")
command.Flags().Uint32(FlagPiTRBatchCount, defaultPiTRBatchCount, "")
command.Flags().Uint32(FlagPiTRBatchSize, defaultPiTRBatchSize, "")
_ = command.Flags().MarkHidden(FlagPiTRBatchCount)
_ = command.Flags().MarkHidden(FlagPiTRBatchSize)
command.Flags().Uint32(FlagPiTRBatchCount, defaultPiTRBatchCount, "specify the batch count to restore log.")
command.Flags().Uint32(FlagPiTRBatchSize, defaultPiTRBatchSize, "specify the batch size to retore log.")
command.Flags().Uint32(FlagPiTRConcurrency, defaultPiTRConcurrency, "specify the concurrency to restore log.")
}

// ParseStreamRestoreFlags parses the `restore stream` flags from the flag set.
Expand Down Expand Up @@ -244,6 +245,9 @@ func (cfg *RestoreConfig) ParseStreamRestoreFlags(flags *pflag.FlagSet) error {
if cfg.PitrBatchSize, err = flags.GetUint32(FlagPiTRBatchSize); err != nil {
return errors.Trace(err)
}
if cfg.PitrConcurrency, err = flags.GetUint32(FlagPiTRConcurrency); err != nil {
return errors.Trace(err)
}
return nil
}

Expand Down Expand Up @@ -370,10 +374,18 @@ func (cfg *RestoreConfig) Adjust() {
}

func (cfg *RestoreConfig) adjustRestoreConfigForStreamRestore() {
if cfg.Config.Concurrency == 0 {
log.Info("set restore kv files concurrency", zap.Int("concurrency", defaultRestoreStreamConcurrency))
cfg.Config.Concurrency = defaultRestoreStreamConcurrency
if cfg.PitrConcurrency == 0 {
cfg.PitrConcurrency = defaultPiTRConcurrency
}
if cfg.PitrBatchCount == 0 {
cfg.PitrBatchCount = defaultPiTRBatchCount
}
if cfg.PitrBatchSize == 0 {
cfg.PitrBatchSize = defaultPiTRBatchSize
}

log.Info("set restore kv files concurrency", zap.Int("concurrency", int(cfg.PitrConcurrency)))
cfg.Config.Concurrency = cfg.PitrConcurrency
}

func configureRestoreClient(ctx context.Context, client *restore.Client, cfg *RestoreConfig) error {
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/task/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ func TestConfigureRestoreClient(t *testing.T) {
require.True(t, client.IsOnline())
}

func TestAdjustRestoreConfigForStreamRestore(t *testing.T) {
restoreCfg := RestoreConfig{}

restoreCfg.adjustRestoreConfigForStreamRestore()
require.Equal(t, restoreCfg.PitrBatchCount, uint32(defaultPiTRBatchCount))
require.Equal(t, restoreCfg.PitrBatchSize, uint32(defaultPiTRBatchSize))
require.Equal(t, restoreCfg.PitrConcurrency, uint32(defaultPiTRConcurrency))
require.Equal(t, restoreCfg.Concurrency, restoreCfg.PitrConcurrency)
}

func TestCheckRestoreDBAndTable(t *testing.T) {
cases := []struct {
cfgSchemas map[string]struct{}
Expand Down