Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
backup: add raw backup command
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer committed Feb 4, 2020
1 parent 3863a3a commit 80e615d
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 39 deletions.
130 changes: 98 additions & 32 deletions cmd/backup.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"bytes"
"context"

"github.com/pingcap/errors"
Expand All @@ -25,6 +26,16 @@ const (
flagLastBackupTS = "lastbackupts"
)

type backupContext struct {
db string
table string

isRawKv bool
startKey []byte
endKey []byte
cf string
}

func defineBackupFlags(flagSet *pflag.FlagSet) {
flagSet.StringP(
flagBackupTimeago, "", "",
Expand All @@ -44,7 +55,7 @@ func defineBackupFlags(flagSet *pflag.FlagSet) {
_ = flagSet.MarkHidden(flagBackupRateLimitUnit)
}

func runBackup(flagSet *pflag.FlagSet, cmdName, db, table string) error {
func runBackup(flagSet *pflag.FlagSet, cmdName string, bc backupContext) error {
ctx, cancel := context.WithCancel(defaultContext)
defer cancel()

Expand Down Expand Up @@ -110,10 +121,18 @@ func runBackup(flagSet *pflag.FlagSet, cmdName, db, table string) error {

defer summary.Summary(cmdName)

ranges, backupSchemas, err := backup.BuildBackupRangeAndSchema(
mgr.GetDomain(), mgr.GetTiKV(), backupTS, db, table)
if err != nil {
return err
var (
ranges []backup.Range
backupSchemas *backup.Schemas
)
if bc.isRawKv {
ranges = []backup.Range{{StartKey: bc.startKey, EndKey: bc.endKey}}
} else {
ranges, backupSchemas, err = backup.BuildBackupRangeAndSchema(
mgr.GetDomain(), mgr.GetTiKV(), backupTS, bc.db, bc.table)
if err != nil {
return err
}
}

// The number of regions need to backup
Expand All @@ -133,38 +152,39 @@ func runBackup(flagSet *pflag.FlagSet, cmdName, db, table string) error {
updateCh := utils.StartProgress(
ctx, cmdName, int64(approximateRegions), !HasLogFile())
err = client.BackupRanges(
ctx, ranges, lastBackupTS, backupTS, ratelimit, concurrency, updateCh)
ctx, ranges, lastBackupTS, backupTS, ratelimit, concurrency, updateCh, bc.isRawKv, bc.cf)
if err != nil {
return err
}
// Backup has finished
close(updateCh)

// Checksum
backupSchemasConcurrency := backup.DefaultSchemaConcurrency
if backupSchemas.Len() < backupSchemasConcurrency {
backupSchemasConcurrency = backupSchemas.Len()
}
updateCh = utils.StartProgress(
ctx, "Checksum", int64(backupSchemas.Len()), !HasLogFile())
backupSchemas.SetSkipChecksum(!checksum)
backupSchemas.Start(
ctx, mgr.GetTiKV(), backupTS, uint(backupSchemasConcurrency), updateCh)

err = client.CompleteMeta(backupSchemas)
if err != nil {
return err
}
if backupSchemas != nil {
// Checksum
backupSchemasConcurrency := backup.DefaultSchemaConcurrency
if backupSchemas.Len() < backupSchemasConcurrency {
backupSchemasConcurrency = backupSchemas.Len()
}
updateCh = utils.StartProgress(
ctx, "Checksum", int64(backupSchemas.Len()), !HasLogFile())
backupSchemas.SetSkipChecksum(!checksum)
backupSchemas.Start(
ctx, mgr.GetTiKV(), backupTS, uint(backupSchemasConcurrency), updateCh)

valid, err := client.FastChecksum()
if err != nil {
return err
}
if !valid {
log.Error("backup FastChecksum failed!")
err = client.CompleteMeta(backupSchemas)
if err != nil {
return err
}
valid, err := client.FastChecksum()
if err != nil {
return err
}
if !valid {
log.Error("backup FastChecksum failed!")
}
// Checksum has finished
close(updateCh)
}
// Checksum has finished
close(updateCh)

err = client.SaveBackupMeta(ctx)
if err != nil {
Expand Down Expand Up @@ -198,6 +218,7 @@ func NewBackupCommand() *cobra.Command {
newFullBackupCommand(),
newDbBackupCommand(),
newTableBackupCommand(),
newRawBackupCommand(),
)

defineBackupFlags(command.PersistentFlags())
Expand All @@ -211,7 +232,8 @@ func newFullBackupCommand() *cobra.Command {
Short: "backup all database",
RunE: func(command *cobra.Command, _ []string) error {
// empty db/table means full backup.
return runBackup(command.Flags(), "Full backup", "", "")
bc := backupContext{db: "", table: "", isRawKv: false}
return runBackup(command.Flags(), "Full backup", bc)
},
}
return command
Expand All @@ -230,7 +252,8 @@ func newDbBackupCommand() *cobra.Command {
if len(db) == 0 {
return errors.Errorf("empty database name is not allowed")
}
return runBackup(command.Flags(), "Database backup", db, "")
bc := backupContext{db: db, table: "", isRawKv: false}
return runBackup(command.Flags(), "Database backup", bc)
},
}
command.Flags().StringP(flagDatabase, "", "", "backup a table in the specific db")
Expand Down Expand Up @@ -259,7 +282,8 @@ func newTableBackupCommand() *cobra.Command {
if len(table) == 0 {
return errors.Errorf("empty table name is not allowed")
}
return runBackup(command.Flags(), "Table backup", db, table)
bc := backupContext{db: db, table: table, isRawKv: false}
return runBackup(command.Flags(), "Table backup", bc)
},
}
command.Flags().StringP(flagDatabase, "", "", "backup a table in the specific db")
Expand All @@ -268,3 +292,45 @@ func newTableBackupCommand() *cobra.Command {
_ = command.MarkFlagRequired(flagTable)
return command
}

// newRawBackupCommand return a raw kv range backup subcommand.
func newRawBackupCommand() *cobra.Command {
command := &cobra.Command{
Use: "raw",
Short: "backup a raw kv range from TiKV cluster",
RunE: func(command *cobra.Command, _ []string) error {
start, err := command.Flags().GetString("start")
if err != nil {
return err
}
startKey, err := utils.ParseKey(command.Flags(), start)
if err != nil {
return err
}
end, err := command.Flags().GetString("end")
if err != nil {
return err
}
endKey, err := utils.ParseKey(command.Flags(), end)
if err != nil {
return err
}

cf, err := command.Flags().GetString("cf")
if err != nil {
return err
}

if bytes.Compare(startKey, endKey) > 0 {
return errors.New("input endKey must greater or equal than startKey")
}
bc := backupContext{startKey: startKey, endKey: endKey, isRawKv: true, cf: cf}
return runBackup(command.Flags(), "Raw Backup", bc)
},
}
command.Flags().StringP("format", "", "raw", "raw key format")
command.Flags().StringP("cf", "", "default", "backup raw kv cf")
command.Flags().StringP("start", "", "", "backup raw kv start key")
command.Flags().StringP("end", "", "", "backup raw kv end key")
return command
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/onsi/gomega v1.7.1 // indirect
github.com/pingcap/check v0.0.0-20191107115940-caf2b9e6ccf4
github.com/pingcap/errors v0.11.4
github.com/pingcap/kvproto v0.0.0-20191212110315-d6a9d626988c
github.com/pingcap/kvproto v0.0.0-20191217072959-393e6c0fd4b7
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20191210060830-bdf23a7ade01
github.com/pingcap/pd v1.1.0-beta.0.20191212045800-234784c7a9c5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,8 @@ github.com/pingcap/kvproto v0.0.0-20191030021250-51b332bcb20b/go.mod h1:WWLmULLO
github.com/pingcap/kvproto v0.0.0-20191121022655-4c654046831d/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191202044712-32be31591b03 h1:IyJl+qesVPf3UfFFmKtX69y1K5KC8uXlot3U0QgH7V4=
github.com/pingcap/kvproto v0.0.0-20191202044712-32be31591b03/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191212110315-d6a9d626988c h1:CwVCq7XA/NvTQ6X9ZAhZlvcEvseUsHiPFQf2mL3LVl4=
github.com/pingcap/kvproto v0.0.0-20191212110315-d6a9d626988c/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191217072959-393e6c0fd4b7 h1:thLL2vFObG8vxBCkAmfAbLVBPfXUkBSXaVxppStCrL0=
github.com/pingcap/kvproto v0.0.0-20191217072959-393e6c0fd4b7/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
Expand Down
24 changes: 20 additions & 4 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ func (bc *Client) BackupRanges(
rateLimit uint64,
concurrency uint32,
updateCh chan<- struct{},
isRawKv bool,
cf string,
) error {
start := time.Now()
defer func() {
Expand All @@ -298,7 +300,7 @@ func (bc *Client) BackupRanges(
go func() {
for _, r := range ranges {
err := bc.backupRange(
ctx, r.StartKey, r.EndKey, lastBackupTS, backupTS, rateLimit, concurrency, updateCh)
ctx, r.StartKey, r.EndKey, lastBackupTS, backupTS, rateLimit, concurrency, updateCh, isRawKv, cf)
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -346,6 +348,8 @@ func (bc *Client) backupRange(
rateLimit uint64,
concurrency uint32,
updateCh chan<- struct{},
isRawKv bool,
cf string,
) (err error) {
start := time.Now()
defer func() {
Expand Down Expand Up @@ -381,6 +385,8 @@ func (bc *Client) backupRange(
StorageBackend: bc.backend,
RateLimit: rateLimit,
Concurrency: concurrency,
IsRawKv: isRawKv,
Cf: cf,
}
push := newPushDown(ctx, bc.mgr, len(allStores))

Expand All @@ -402,9 +408,19 @@ func (bc *Client) backupRange(

bc.backupMeta.StartVersion = lastBackupTS
bc.backupMeta.EndVersion = backupTS
log.Info("backup time range",
zap.Reflect("StartVersion", lastBackupTS),
zap.Reflect("EndVersion", backupTS))
bc.backupMeta.IsRawKv = isRawKv
if bc.backupMeta.IsRawKv {
bc.backupMeta.RawRanges = append(bc.backupMeta.RawRanges,
&backup.RawRange{StartKey: startKey, EndKey: endKey, Cf: cf})
log.Info("backup raw ranges",
zap.ByteString("startKey", startKey),
zap.ByteString("endKey", endKey),
zap.String("cf", cf))
} else {
log.Info("backup time range",
zap.Reflect("StartVersion", lastBackupTS),
zap.Reflect("EndVersion", backupTS))
}

results.tree.Ascend(func(i btree.Item) bool {
r := i.(*Range)
Expand Down
70 changes: 70 additions & 0 deletions pkg/utils/key.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package utils

import (
"bytes"
"encoding/hex"
"fmt"
"io"
"strings"

"github.com/pingcap/errors"
"github.com/spf13/pflag"
)

// ParseKey parse key by given format
func ParseKey(flags *pflag.FlagSet, key string) ([]byte, error) {
switch flags.Lookup("format").Value.String() {
case "raw":
return []byte(key), nil
case "escaped":
return unescapedKey(key)
case "hex":
key, err := hex.DecodeString(key)
if err != nil {
return nil, errors.WithStack(err)
}
return key, nil
}
return nil, errors.New("unknown format")
}

func unescapedKey(text string) ([]byte, error) {
var buf []byte
r := bytes.NewBuffer([]byte(text))
for {
c, err := r.ReadByte()
if err != nil {
if err != io.EOF {
return nil, errors.WithStack(err)
}
break
}
if c != '\\' {
buf = append(buf, c)
continue
}
n := r.Next(1)
if len(n) == 0 {
return nil, io.EOF
}
// See: https://golang.org/ref/spec#Rune_literals
if idx := strings.IndexByte(`abfnrtv\'"`, n[0]); idx != -1 {
buf = append(buf, []byte("\a\b\f\n\r\t\v\\'\"")[idx])
continue
}

switch n[0] {
case 'x':
fmt.Sscanf(string(r.Next(2)), "%02x", &c)
buf = append(buf, c)
default:
n = append(n, r.Next(2)...)
_, err := fmt.Sscanf(string(n), "%03o", &c)
if err != nil {
return nil, errors.WithStack(err)
}
buf = append(buf, c)
}
}
return buf, nil
}
41 changes: 41 additions & 0 deletions pkg/utils/key_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package utils

import (
"encoding/hex"

. "github.com/pingcap/check"
"github.com/spf13/pflag"
)

type testKeySuite struct{}

var _ = Suite(&testKeySuite{})

func (r *testKeySuite) TestParseKey(c *C) {
flagSet := &pflag.FlagSet{}
flagSet.String("format", "raw", "")
rawKey := "1234"
parsedKey, err := ParseKey(flagSet, rawKey)
c.Assert(err, IsNil)
c.Assert(parsedKey, BytesEquals, []byte(rawKey))

flagSet = &pflag.FlagSet{}
flagSet.String("format", "escaped", "")
escapedKey := "\\a\\x1"
parsedKey, err = ParseKey(flagSet, escapedKey)
c.Assert(err, IsNil)
c.Assert(parsedKey, BytesEquals, []byte("\a\x01"))

flagSet = &pflag.FlagSet{}
flagSet.String("format", "hex", "")
hexKey := hex.EncodeToString([]byte("1234"))
parsedKey, err = ParseKey(flagSet, hexKey)
c.Assert(err, IsNil)
c.Assert(parsedKey, BytesEquals, []byte("1234"))

flagSet = &pflag.FlagSet{}
flagSet.String("format", "notSupport", "")
_, err = ParseKey(flagSet, rawKey)
c.Assert(err, ErrorMatches, "*unknown format*")

}

0 comments on commit 80e615d

Please sign in to comment.