Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
backup: add raw backup command
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer committed Dec 11, 2019
1 parent a7778a5 commit bdb47e6
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 3 deletions.
86 changes: 84 additions & 2 deletions cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func NewBackupCommand() *cobra.Command {
command.AddCommand(
newFullBackupCommand(),
newTableBackupCommand(),
newRawBackupCommand(),
)

command.PersistentFlags().StringP(
Expand Down Expand Up @@ -134,7 +135,7 @@ func newFullBackupCommand() *cobra.Command {
updateCh := utils.StartProgress(
ctx, "Full Backup", int64(approximateRegions), !HasLogFile())
err = client.BackupRanges(
ctx, ranges, backupTS, rate, concurrency, updateCh)
ctx, ranges, backupTS, rate, concurrency, updateCh, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -273,7 +274,7 @@ func newTableBackupCommand() *cobra.Command {
updateCh := utils.StartProgress(
ctx, "Table Backup", int64(approximateRegions), !HasLogFile())
err = client.BackupRanges(
ctx, ranges, backupTS, rate, concurrency, updateCh)
ctx, ranges, backupTS, rate, concurrency, updateCh, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -319,3 +320,84 @@ func newTableBackupCommand() *cobra.Command {
}
return command
}

// newRawBackupCommand return a raw kv range backup subcommand.
func newRawBackupCommand() *cobra.Command {
command := &cobra.Command{
Use: "raw",
Short: "backup a raw kv range",
RunE: func(command *cobra.Command, _ []string) error {
ctx, cancel := context.WithCancel(defaultContext)
defer cancel()

mgr, err := GetDefaultMgr()
if err != nil {
return err
}
defer mgr.Close()

client, err := backup.NewBackupClient(ctx, mgr)
if err != nil {
return err
}
u, err := storage.ParseBackendFromFlags(command.Flags(), FlagStorage)
if err != nil {
return err
}

err = client.SetStorage(u)
if err != nil {
return err
}

start, err := command.Flags().GetString("start")
if err != nil {
return err
}

end, err := command.Flags().GetString("end")
if err != nil {
return err
}

rate, err := command.Flags().GetUint64("ratelimit")
if err != nil {
return err
}
concurrency, err := command.Flags().GetUint32("concurrency")
if err != nil {
return err
}
if concurrency == 0 {
return errors.New("at least one thread required")
}

ranges := []backup.Range{{StartKey: []byte(start), EndKey: []byte(end)}}

approximateRegions := 0
for _, r := range ranges {
var regionCount int
regionCount, err = client.GetRangeRegionCount(r.StartKey, r.EndKey)
if err != nil {
return err
}
approximateRegions += regionCount
}

updateCh := utils.StartProgress(
ctx, "Raw Backup", int64(approximateRegions), !HasLogFile())

err = client.BackupRanges(
ctx, ranges, 0, rate, concurrency, updateCh, true)
if err != nil {
return err
}
close(updateCh)

return client.SaveBackupMeta()
},
}
command.Flags().StringP("start", "", "", "backup raw kv start key")
command.Flags().StringP("end", "", "", "backup raw kv end key")
return command
}
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ github.com/pingcap/kvproto v0.0.0-20191113105027-4f292e1801d8 h1:P9jGgwVkLHlbEGt
github.com/pingcap/kvproto v0.0.0-20191113105027-4f292e1801d8/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191210040729-c23886becb54 h1:T8myp+i7bPLy/W4rEjtsAZgjGTqQ0rnLu9xQ4YAfXJU=
github.com/pingcap/kvproto v0.0.0-20191210040729-c23886becb54/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191211032946-5dbce7e7b868 h1:4FIcMXOHW2CwuTHJFuU/nLQ9KH2YYIoizuWfh8cQKk8=
github.com/pingcap/kvproto v0.0.0-20191211032946-5dbce7e7b868/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/parser v0.0.0-20191205054626-288fe5207ce6 h1:KrJorS9gGYMhsQjENNWAeB5ho28xbowZ74pfJWkOmFc=
Expand Down
7 changes: 6 additions & 1 deletion pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ func (bc *Client) BackupRanges(
rate uint64,
concurrency uint32,
updateCh chan<- struct{},
isRawKv bool,
) error {
start := time.Now()
defer func() {
Expand All @@ -285,7 +286,7 @@ func (bc *Client) BackupRanges(
go func() {
for _, r := range ranges {
err := bc.backupRange(
ctx, r.StartKey, r.EndKey, backupTS, rate, concurrency, updateCh)
ctx, r.StartKey, r.EndKey, backupTS, rate, concurrency, updateCh, isRawKv)
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -332,6 +333,7 @@ func (bc *Client) backupRange(
rateMBs uint64,
concurrency uint32,
updateCh chan<- struct{},
isRawKv bool,
) error {
// The unit of rate limit in protocol is bytes per second.
rateLimit := rateMBs * 1024 * 1024
Expand All @@ -357,6 +359,7 @@ func (bc *Client) backupRange(
StorageBackend: bc.backend,
RateLimit: rateLimit,
Concurrency: concurrency,
IsRawKv: isRawKv,
}
push := newPushDown(ctx, bc.mgr, len(allStores))

Expand All @@ -377,6 +380,7 @@ func (bc *Client) backupRange(

bc.backupMeta.StartVersion = backupTS
bc.backupMeta.EndVersion = backupTS
bc.backupMeta.IsRawKv = isRawKv
log.Info("backup time range",
zap.Reflect("StartVersion", backupTS),
zap.Reflect("EndVersion", backupTS))
Expand Down Expand Up @@ -677,6 +681,7 @@ func SendBackup(

// GetRangeRegionCount get region count by pd http api
func (bc *Client) GetRangeRegionCount(startKey, endKey []byte) (int, error) {
// TODO get region count by [startKey, endKey)
return bc.mgr.GetRegionCount()
}

Expand Down

0 comments on commit bdb47e6

Please sign in to comment.