Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
backup: add raw backup command
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer committed Dec 13, 2019
1 parent ca84583 commit 2820b4b
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 39 deletions.
130 changes: 98 additions & 32 deletions cmd/backup.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"bytes"
"context"

"github.com/pingcap/errors"
Expand All @@ -24,6 +25,16 @@ const (
flagBackupTable = "table"
)

type backupContext struct {
db string
table string

isRawKv bool
startKey []byte
endKey []byte
cf string
}

func defineBackupFlags(flagSet *pflag.FlagSet) {
flagSet.StringP(
flagBackupTimeago, "", "",
Expand All @@ -36,7 +47,7 @@ func defineBackupFlags(flagSet *pflag.FlagSet) {
"Run checksum after backup")
}

func runBackup(flagSet *pflag.FlagSet, cmdName, db, table string) error {
func runBackup(flagSet *pflag.FlagSet, cmdName string, bc backupContext) error {
ctx, cancel := context.WithCancel(defaultContext)
defer cancel()

Expand Down Expand Up @@ -90,10 +101,18 @@ func runBackup(flagSet *pflag.FlagSet, cmdName, db, table string) error {
return err
}

ranges, backupSchemas, err := backup.BuildBackupRangeAndSchema(
mgr.GetDomain(), mgr.GetTiKV(), backupTS, db, table)
if err != nil {
return err
var (
ranges []backup.Range
backupSchemas *backup.Schemas
)
if bc.isRawKv {
ranges = []backup.Range{{StartKey: bc.startKey, EndKey: bc.endKey}}
} else {
ranges, backupSchemas, err = backup.BuildBackupRangeAndSchema(
mgr.GetDomain(), mgr.GetTiKV(), backupTS, bc.db, bc.table)
if err != nil {
return err
}
}

// The number of regions need to backup
Expand All @@ -112,38 +131,39 @@ func runBackup(flagSet *pflag.FlagSet, cmdName, db, table string) error {
updateCh := utils.StartProgress(
ctx, cmdName, int64(approximateRegions), !HasLogFile())
err = client.BackupRanges(
ctx, ranges, backupTS, ratelimit, concurrency, updateCh)
ctx, ranges, backupTS, ratelimit, concurrency, updateCh, bc.isRawKv, bc.cf)
if err != nil {
return err
}
// Backup has finished
close(updateCh)

// Checksum
backupSchemasConcurrency := backup.DefaultSchemaConcurrency
if backupSchemas.Len() < backupSchemasConcurrency {
backupSchemasConcurrency = backupSchemas.Len()
}
updateCh = utils.StartProgress(
ctx, "Checksum", int64(backupSchemas.Len()), !HasLogFile())
backupSchemas.SetSkipChecksum(!checksum)
backupSchemas.Start(
ctx, mgr.GetTiKV(), backupTS, uint(backupSchemasConcurrency), updateCh)

err = client.CompleteMeta(backupSchemas)
if err != nil {
return err
}
if backupSchemas != nil {
// Checksum
backupSchemasConcurrency := backup.DefaultSchemaConcurrency
if backupSchemas.Len() < backupSchemasConcurrency {
backupSchemasConcurrency = backupSchemas.Len()
}
updateCh = utils.StartProgress(
ctx, "Checksum", int64(backupSchemas.Len()), !HasLogFile())
backupSchemas.SetSkipChecksum(!checksum)
backupSchemas.Start(
ctx, mgr.GetTiKV(), backupTS, uint(backupSchemasConcurrency), updateCh)

valid, err := client.FastChecksum()
if err != nil {
return err
}
if !valid {
log.Error("backup FastChecksum failed!")
err = client.CompleteMeta(backupSchemas)
if err != nil {
return err
}
valid, err := client.FastChecksum()
if err != nil {
return err
}
if !valid {
log.Error("backup FastChecksum failed!")
}
// Checksum has finished
close(updateCh)
}
// Checksum has finished
close(updateCh)

return client.SaveBackupMeta(ctx)
}
Expand Down Expand Up @@ -171,6 +191,7 @@ func NewBackupCommand() *cobra.Command {
newFullBackupCommand(),
newDbBackupCommand(),
newTableBackupCommand(),
newRawBackupCommand(),
)

defineBackupFlags(command.PersistentFlags())
Expand All @@ -184,7 +205,8 @@ func newFullBackupCommand() *cobra.Command {
Short: "backup all database",
RunE: func(command *cobra.Command, _ []string) error {
// empty db/table means full backup.
return runBackup(command.Flags(), "Full backup", "", "")
bc := backupContext{db: "", table: "", isRawKv: false}
return runBackup(command.Flags(), "Full backup", bc)
},
}
return command
Expand All @@ -203,7 +225,8 @@ func newDbBackupCommand() *cobra.Command {
if len(db) == 0 {
return errors.Errorf("empty database name is not allowed")
}
return runBackup(command.Flags(), "Database backup", db, "")
bc := backupContext{db: db, table: "", isRawKv: false}
return runBackup(command.Flags(), "Database backup", bc)
},
}
command.Flags().StringP(flagBackupDB, "", "", "backup a table in the specific db")
Expand Down Expand Up @@ -232,8 +255,9 @@ func newTableBackupCommand() *cobra.Command {
if len(table) == 0 {
return errors.Errorf("empty table name is not allowed")
}
bc := backupContext{db: db, table: table, isRawKv: false}
return runBackup(command.Flags(), "Table backup", bc)

return runBackup(command.Flags(), "Table backup", db, table)
},
}
command.Flags().StringP(flagBackupDB, "", "", "backup a table in the specific db")
Expand All @@ -242,3 +266,45 @@ func newTableBackupCommand() *cobra.Command {
_ = command.MarkFlagRequired(flagBackupTable)
return command
}

// newRawBackupCommand return a raw kv range backup subcommand.
func newRawBackupCommand() *cobra.Command {
command := &cobra.Command{
Use: "raw",
Short: "backup a raw kv range from TiKV cluster",
RunE: func(command *cobra.Command, _ []string) error {
start, err := command.Flags().GetString("start")
if err != nil {
return err
}
startKey, err := utils.ParseKey(command.Flags(), start)
if err != nil {
return err
}
end, err := command.Flags().GetString("end")
if err != nil {
return err
}
endKey, err := utils.ParseKey(command.Flags(), end)
if err != nil {
return err
}

cf, err := command.Flags().GetString("cf")
if err != nil {
return err
}

if bytes.Compare(startKey, endKey) > 0 {
return errors.New("input endKey must greater or equal than startKey")
}
bc := backupContext{startKey: startKey, endKey: endKey, isRawKv: true, cf: cf}
return runBackup(command.Flags(), "Raw Backup", bc)
},
}
command.Flags().StringP("format", "", "raw", "raw key format")
command.Flags().StringP("cf", "", "default", "backup raw kv cf")
command.Flags().StringP("start", "", "", "backup raw kv start key")
command.Flags().StringP("end", "", "", "backup raw kv end key")
return command
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/onsi/gomega v1.7.1 // indirect
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8
github.com/pingcap/errors v0.11.4
github.com/pingcap/kvproto v0.0.0-20191212073621-373b0fec09a1
github.com/pingcap/kvproto v0.0.0-20191212111403-2c6422d4614b
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd
github.com/pingcap/parser v0.0.0-20191205054626-288fe5207ce6
github.com/pingcap/pd v1.1.0-beta.0.20191115131715-6b7dc037010e
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17Xtb
github.com/pingcap/kvproto v0.0.0-20191018025622-fbf07f9804da/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191113105027-4f292e1801d8 h1:P9jGgwVkLHlbEGtgGKrY0k/yy6N8L8Gdj8dliFswllU=
github.com/pingcap/kvproto v0.0.0-20191113105027-4f292e1801d8/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191212073621-373b0fec09a1 h1:ZwuIPYV68E2TvKoIQ/bP7b7sC03hLqFc1Y9VDzWXCdk=
github.com/pingcap/kvproto v0.0.0-20191212073621-373b0fec09a1/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191212111403-2c6422d4614b h1:TcrATUpJ9EADLXKmnREh+haj6GXY8sAkRFuqoIfVRUQ=
github.com/pingcap/kvproto v0.0.0-20191212111403-2c6422d4614b/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/parser v0.0.0-20191205054626-288fe5207ce6 h1:KrJorS9gGYMhsQjENNWAeB5ho28xbowZ74pfJWkOmFc=
Expand Down
24 changes: 20 additions & 4 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ func (bc *Client) BackupRanges(
rate uint64,
concurrency uint32,
updateCh chan<- struct{},
isRawKv bool,
cf string,
) error {
start := time.Now()
defer func() {
Expand All @@ -294,7 +296,7 @@ func (bc *Client) BackupRanges(
go func() {
for _, r := range ranges {
err := bc.backupRange(
ctx, r.StartKey, r.EndKey, backupTS, rate, concurrency, updateCh)
ctx, r.StartKey, r.EndKey, backupTS, rate, concurrency, updateCh, isRawKv, cf)
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -341,6 +343,8 @@ func (bc *Client) backupRange(
rateMBs uint64,
concurrency uint32,
updateCh chan<- struct{},
isRawKv bool,
cf string,
) error {
// The unit of rate limit in protocol is bytes per second.
rateLimit := rateMBs * 1024 * 1024
Expand All @@ -366,6 +370,8 @@ func (bc *Client) backupRange(
StorageBackend: bc.backend,
RateLimit: rateLimit,
Concurrency: concurrency,
IsRawKv: isRawKv,
Cf: cf,
}
push := newPushDown(ctx, bc.mgr, len(allStores))

Expand All @@ -386,9 +392,19 @@ func (bc *Client) backupRange(

bc.backupMeta.StartVersion = backupTS
bc.backupMeta.EndVersion = backupTS
log.Info("backup time range",
zap.Reflect("StartVersion", backupTS),
zap.Reflect("EndVersion", backupTS))
bc.backupMeta.IsRawKv = isRawKv
if bc.backupMeta.IsRawKv {
bc.backupMeta.RawRanges = append(bc.backupMeta.RawRanges,
&backup.RawRange{StartKey: startKey, EndKey: endKey, Cf: cf})
log.Info("backup raw ranges",
zap.ByteString("startKey", startKey),
zap.ByteString("endKey", endKey),
zap.String("cf", cf))
} else {
log.Info("backup time range",
zap.Reflect("StartVersion", backupTS),
zap.Reflect("EndVersion", backupTS))
}

results.tree.Ascend(func(i btree.Item) bool {
r := i.(*Range)
Expand Down
70 changes: 70 additions & 0 deletions pkg/utils/key.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package utils

import (
"bytes"
"encoding/hex"
"fmt"
"io"
"strings"

"github.com/pingcap/errors"
"github.com/spf13/pflag"
)

// ParseKey parse key by given format
func ParseKey(flags *pflag.FlagSet, key string) ([]byte, error) {
switch flags.Lookup("format").Value.String() {
case "raw":
return []byte(key), nil
case "escaped":
return unescapedKey(key)
case "hex":
key, err := hex.DecodeString(key)
if err != nil {
return nil, errors.WithStack(err)
}
return key, nil
}
return nil, errors.New("unknown format")
}

func unescapedKey(text string) ([]byte, error) {
var buf []byte
r := bytes.NewBuffer([]byte(text))
for {
c, err := r.ReadByte()
if err != nil {
if err != io.EOF {
return nil, errors.WithStack(err)
}
break
}
if c != '\\' {
buf = append(buf, c)
continue
}
n := r.Next(1)
if len(n) == 0 {
return nil, io.EOF
}
// See: https://golang.org/ref/spec#Rune_literals
if idx := strings.IndexByte(`abfnrtv\'"`, n[0]); idx != -1 {
buf = append(buf, []byte("\a\b\f\n\r\t\v\\'\"")[idx])
continue
}

switch n[0] {
case 'x':
fmt.Sscanf(string(r.Next(2)), "%02x", &c)
buf = append(buf, c)
default:
n = append(n, r.Next(2)...)
_, err := fmt.Sscanf(string(n), "%03o", &c)
if err != nil {
return nil, errors.WithStack(err)
}
buf = append(buf, c)
}
}
return buf, nil
}
41 changes: 41 additions & 0 deletions pkg/utils/key_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package utils

import (
"encoding/hex"

. "github.com/pingcap/check"
"github.com/spf13/pflag"
)

type testKeySuite struct{}

var _ = Suite(&testKeySuite{})

func (r *testKeySuite) TestParseKey(c *C) {
flagSet := &pflag.FlagSet{}
flagSet.String("format", "raw", "")
rawKey := "1234"
parsedKey, err := ParseKey(flagSet, rawKey)
c.Assert(err, IsNil)
c.Assert(parsedKey, BytesEquals, []byte(rawKey))

flagSet = &pflag.FlagSet{}
flagSet.String("format", "escaped", "")
escapedKey := "\\a\\x1"
parsedKey, err = ParseKey(flagSet, escapedKey)
c.Assert(err, IsNil)
c.Assert(parsedKey, BytesEquals, []byte("\a\x01"))

flagSet = &pflag.FlagSet{}
flagSet.String("format", "hex", "")
hexKey := hex.EncodeToString([]byte("1234"))
parsedKey, err = ParseKey(flagSet, hexKey)
c.Assert(err, IsNil)
c.Assert(parsedKey, BytesEquals, []byte("1234"))

flagSet = &pflag.FlagSet{}
flagSet.String("format", "notSupport", "")
_, err = ParseKey(flagSet, rawKey)
c.Assert(err, ErrorMatches, "*unknown format*")

}

0 comments on commit 2820b4b

Please sign in to comment.