Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

[DNM] Test branch for rawkv backup #131

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 98 additions & 32 deletions cmd/backup.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"bytes"
"context"

"github.com/pingcap/errors"
Expand All @@ -25,6 +26,16 @@ const (
flagLastBackupTS = "lastbackupts"
)

type backupContext struct {
db string
table string

isRawKv bool
startKey []byte
endKey []byte
cf string
}

func defineBackupFlags(flagSet *pflag.FlagSet) {
flagSet.StringP(
flagBackupTimeago, "", "",
Expand All @@ -44,7 +55,7 @@ func defineBackupFlags(flagSet *pflag.FlagSet) {
_ = flagSet.MarkHidden(flagBackupRateLimitUnit)
}

func runBackup(flagSet *pflag.FlagSet, cmdName, db, table string) error {
func runBackup(flagSet *pflag.FlagSet, cmdName string, bc backupContext) error {
ctx, cancel := context.WithCancel(defaultContext)
defer cancel()

Expand Down Expand Up @@ -110,10 +121,18 @@ func runBackup(flagSet *pflag.FlagSet, cmdName, db, table string) error {

defer summary.Summary(cmdName)

ranges, backupSchemas, err := backup.BuildBackupRangeAndSchema(
mgr.GetDomain(), mgr.GetTiKV(), backupTS, db, table)
if err != nil {
return err
var (
ranges []backup.Range
backupSchemas *backup.Schemas
)
if bc.isRawKv {
ranges = []backup.Range{{StartKey: bc.startKey, EndKey: bc.endKey}}
} else {
ranges, backupSchemas, err = backup.BuildBackupRangeAndSchema(
mgr.GetDomain(), mgr.GetTiKV(), backupTS, bc.db, bc.table)
if err != nil {
return err
}
}

// The number of regions need to backup
Expand All @@ -133,38 +152,39 @@ func runBackup(flagSet *pflag.FlagSet, cmdName, db, table string) error {
updateCh := utils.StartProgress(
ctx, cmdName, int64(approximateRegions), !HasLogFile())
err = client.BackupRanges(
ctx, ranges, lastBackupTS, backupTS, ratelimit, concurrency, updateCh)
ctx, ranges, lastBackupTS, backupTS, ratelimit, concurrency, updateCh, bc.isRawKv, bc.cf)
if err != nil {
return err
}
// Backup has finished
close(updateCh)

// Checksum
backupSchemasConcurrency := backup.DefaultSchemaConcurrency
if backupSchemas.Len() < backupSchemasConcurrency {
backupSchemasConcurrency = backupSchemas.Len()
}
updateCh = utils.StartProgress(
ctx, "Checksum", int64(backupSchemas.Len()), !HasLogFile())
backupSchemas.SetSkipChecksum(!checksum)
backupSchemas.Start(
ctx, mgr.GetTiKV(), backupTS, uint(backupSchemasConcurrency), updateCh)

err = client.CompleteMeta(backupSchemas)
if err != nil {
return err
}
if backupSchemas != nil {
// Checksum
backupSchemasConcurrency := backup.DefaultSchemaConcurrency
if backupSchemas.Len() < backupSchemasConcurrency {
backupSchemasConcurrency = backupSchemas.Len()
}
updateCh = utils.StartProgress(
ctx, "Checksum", int64(backupSchemas.Len()), !HasLogFile())
backupSchemas.SetSkipChecksum(!checksum)
backupSchemas.Start(
ctx, mgr.GetTiKV(), backupTS, uint(backupSchemasConcurrency), updateCh)

valid, err := client.FastChecksum()
if err != nil {
return err
}
if !valid {
log.Error("backup FastChecksum failed!")
err = client.CompleteMeta(backupSchemas)
if err != nil {
return err
}
valid, err := client.FastChecksum()
if err != nil {
return err
}
if !valid {
log.Error("backup FastChecksum failed!")
}
// Checksum has finished
close(updateCh)
}
// Checksum has finished
close(updateCh)

err = client.SaveBackupMeta(ctx)
if err != nil {
Expand Down Expand Up @@ -198,6 +218,7 @@ func NewBackupCommand() *cobra.Command {
newFullBackupCommand(),
newDbBackupCommand(),
newTableBackupCommand(),
newRawBackupCommand(),
)

defineBackupFlags(command.PersistentFlags())
Expand All @@ -211,7 +232,8 @@ func newFullBackupCommand() *cobra.Command {
Short: "backup all database",
RunE: func(command *cobra.Command, _ []string) error {
// empty db/table means full backup.
return runBackup(command.Flags(), "Full backup", "", "")
bc := backupContext{db: "", table: "", isRawKv: false}
return runBackup(command.Flags(), "Full backup", bc)
},
}
return command
Expand All @@ -230,7 +252,8 @@ func newDbBackupCommand() *cobra.Command {
if len(db) == 0 {
return errors.Errorf("empty database name is not allowed")
}
return runBackup(command.Flags(), "Database backup", db, "")
bc := backupContext{db: db, table: "", isRawKv: false}
return runBackup(command.Flags(), "Database backup", bc)
},
}
command.Flags().StringP(flagDatabase, "", "", "backup a table in the specific db")
Expand Down Expand Up @@ -259,7 +282,8 @@ func newTableBackupCommand() *cobra.Command {
if len(table) == 0 {
return errors.Errorf("empty table name is not allowed")
}
return runBackup(command.Flags(), "Table backup", db, table)
bc := backupContext{db: db, table: table, isRawKv: false}
return runBackup(command.Flags(), "Table backup", bc)
},
}
command.Flags().StringP(flagDatabase, "", "", "backup a table in the specific db")
Expand All @@ -268,3 +292,45 @@ func newTableBackupCommand() *cobra.Command {
_ = command.MarkFlagRequired(flagTable)
return command
}

// newRawBackupCommand return a raw kv range backup subcommand.
func newRawBackupCommand() *cobra.Command {
command := &cobra.Command{
Use: "raw",
Short: "backup a raw kv range from TiKV cluster",
RunE: func(command *cobra.Command, _ []string) error {
start, err := command.Flags().GetString("start")
if err != nil {
return err
}
startKey, err := utils.ParseKey(command.Flags(), start)
if err != nil {
return err
}
end, err := command.Flags().GetString("end")
if err != nil {
return err
}
endKey, err := utils.ParseKey(command.Flags(), end)
if err != nil {
return err
}

cf, err := command.Flags().GetString("cf")
if err != nil {
return err
}

if bytes.Compare(startKey, endKey) > 0 {
return errors.New("input endKey must greater or equal than startKey")
}
bc := backupContext{startKey: startKey, endKey: endKey, isRawKv: true, cf: cf}
return runBackup(command.Flags(), "Raw Backup", bc)
},
}
command.Flags().StringP("format", "", "raw", "raw key format")
command.Flags().StringP("cf", "", "default", "backup raw kv cf")
command.Flags().StringP("start", "", "", "backup raw kv start key")
command.Flags().StringP("end", "", "", "backup raw kv end key")
return command
}
115 changes: 115 additions & 0 deletions cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/log"
restore_util "github.com/pingcap/tidb-tools/pkg/restore-util"
"github.com/pingcap/tidb/session"
"github.com/spf13/cobra"
flag "github.com/spf13/pflag"
Expand Down Expand Up @@ -53,6 +54,7 @@ func NewRestoreCommand() *cobra.Command {
newFullRestoreCommand(),
newDbRestoreCommand(),
newTableRestoreCommand(),
newRawRestoreCommand(),
)

command.PersistentFlags().Uint("concurrency", 128,
Expand Down Expand Up @@ -90,6 +92,10 @@ func runRestore(flagSet *flag.FlagSet, cmdName, dbName, tableName string) error
return errors.Trace(err)
}

if client.IsRawKvMode() {
return errors.New("cannot do full restore from raw kv data")
}

files := make([]*backup.File, 0)
tables := make([]*utils.Table, 0)

Expand Down Expand Up @@ -214,6 +220,78 @@ func runRestore(flagSet *flag.FlagSet, cmdName, dbName, tableName string) error
return nil
}

func runRawRestore(flagSet *flag.FlagSet, startKey, endKey []byte, cf string) error {
ctx, cancel := context.WithCancel(GetDefaultContext())
defer cancel()

mgr, err := GetDefaultMgr()
if err != nil {
return err
}
defer mgr.Close()

client, err := restore.NewRestoreClient(
ctx, mgr.GetPDClient(), mgr.GetTiKV())
if err != nil {
return errors.Trace(err)
}
defer client.Close()
err = initRestoreClient(ctx, client, flagSet)
if err != nil {
return errors.Trace(err)
}

if !client.IsRawKvMode() {
return errors.New("cannot do raw restore from transactional data")
}

files, err := client.GetFilesInRawRange(startKey, endKey, cf)
if err != nil {
return errors.Trace(err)
}

// Empty rewrite rules
rewriteRules := &restore_util.RewriteRules{}

ranges, err := restore.ValidateFileRanges(files, rewriteRules)
if err != nil {
return errors.Trace(err)
}

// Redirect to log if there is no log file to avoid unreadable output.
// TODO: How to show progress?
updateCh := utils.StartProgress(
ctx,
"Table Restore",
// Split/Scatter + Download/Ingest
int64(len(ranges)+len(files)),
!HasLogFile())

err = restore.SplitRanges(ctx, client, ranges, rewriteRules, updateCh)
if err != nil {
return errors.Trace(err)
}

removedSchedulers, err := RestorePrepareWork(ctx, client, mgr)
if err != nil {
return errors.Trace(err)
}

err = client.RestoreRaw(startKey, endKey, files, updateCh)
if err != nil {
return errors.Trace(err)
}

err = RestorePostWork(ctx, client, mgr, removedSchedulers)
if err != nil {
return errors.Trace(err)
}
// Restore has finished.
close(updateCh)

return nil
}

func newFullRestoreCommand() *cobra.Command {
command := &cobra.Command{
Use: "full",
Expand Down Expand Up @@ -276,6 +354,43 @@ func newTableRestoreCommand() *cobra.Command {
return command
}

func newRawRestoreCommand() *cobra.Command {
command := &cobra.Command{
Use: "raw",
Short: "restore a raw kv range",
RunE: func(cmd *cobra.Command, _ []string) error {
start, err := cmd.Flags().GetString("start")
if err != nil {
return err
}
startKey, err := utils.ParseKey(cmd.Flags(), start)
if err != nil {
return err
}
end, err := cmd.Flags().GetString("end")
if err != nil {
return err
}
endKey, err := utils.ParseKey(cmd.Flags(), end)
if err != nil {
return err
}

cf, err := cmd.Flags().GetString("cf")
if err != nil {
return errors.Trace(err)
}
return runRawRestore(cmd.Flags(), startKey, endKey, cf)
},
}

command.Flags().StringP("format", "", "raw", "format of raw keys in arguments")
command.Flags().StringP("start", "", "", "restore raw kv start key")
command.Flags().StringP("end", "", "", "restore raw kv end key")
command.Flags().StringP("cf", "", "default", "the cf to restore raw keys")
return command
}

func initRestoreClient(ctx context.Context, client *restore.Client, flagSet *flag.FlagSet) error {
u, err := storage.ParseBackendFromFlags(flagSet, FlagStorage)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/onsi/gomega v1.7.1 // indirect
github.com/pingcap/check v0.0.0-20191107115940-caf2b9e6ccf4
github.com/pingcap/errors v0.11.4
github.com/pingcap/kvproto v0.0.0-20191212110315-d6a9d626988c
github.com/pingcap/kvproto v0.0.0-20191217072959-393e6c0fd4b7
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20191210060830-bdf23a7ade01
github.com/pingcap/pd v1.1.0-beta.0.20191212045800-234784c7a9c5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,8 @@ github.com/pingcap/kvproto v0.0.0-20191030021250-51b332bcb20b/go.mod h1:WWLmULLO
github.com/pingcap/kvproto v0.0.0-20191121022655-4c654046831d/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191202044712-32be31591b03 h1:IyJl+qesVPf3UfFFmKtX69y1K5KC8uXlot3U0QgH7V4=
github.com/pingcap/kvproto v0.0.0-20191202044712-32be31591b03/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191212110315-d6a9d626988c h1:CwVCq7XA/NvTQ6X9ZAhZlvcEvseUsHiPFQf2mL3LVl4=
github.com/pingcap/kvproto v0.0.0-20191212110315-d6a9d626988c/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20191217072959-393e6c0fd4b7 h1:thLL2vFObG8vxBCkAmfAbLVBPfXUkBSXaVxppStCrL0=
github.com/pingcap/kvproto v0.0.0-20191217072959-393e6c0fd4b7/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
Expand Down
Loading