Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

backup: add raw backup command #101

Merged
merged 4 commits into from
Mar 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ build_for_integration_test:
GO111MODULE=on go build -race -o bin/locker tests/br_key_locked/*.go
# build gc
GO111MODULE=on go build -race -o bin/gc tests/br_z_gc_safepoint/*.go
# build rawkv client
GO111MODULE=on go build -race -o bin/rawkv tests/br_rawkv/*.go

test:
GO111MODULE=on go test -race -tags leak ./...
Expand Down
23 changes: 23 additions & 0 deletions cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ func runBackupCommand(command *cobra.Command, cmdName string) error {
return task.RunBackup(GetDefaultContext(), tidbGlue, cmdName, &cfg)
}

func runBackupRawCommand(command *cobra.Command, cmdName string) error {
cfg := task.BackupRawConfig{Config: task.Config{LogProgress: HasLogFile()}}
if err := cfg.ParseFromFlags(command.Flags()); err != nil {
return err
}
return task.RunBackupRaw(GetDefaultContext(), tidbGlue, cmdName, &cfg)
}

// NewBackupCommand return a full backup subcommand.
func NewBackupCommand() *cobra.Command {
command := &cobra.Command{
Expand All @@ -43,6 +51,7 @@ func NewBackupCommand() *cobra.Command {
newFullBackupCommand(),
newDbBackupCommand(),
newTableBackupCommand(),
newRawBackupCommand(),
)

task.DefineBackupFlags(command.PersistentFlags())
Expand Down Expand Up @@ -87,3 +96,17 @@ func newTableBackupCommand() *cobra.Command {
task.DefineTableFlags(command)
return command
}

// newRawBackupCommand return a raw kv range backup subcommand.
func newRawBackupCommand() *cobra.Command {
command := &cobra.Command{
Use: "raw",
Short: "backup a raw kv range from TiKV cluster",
RunE: func(command *cobra.Command, _ []string) error {
return runBackupRawCommand(command, "Raw backup")
},
}

task.DefineRawBackupFlags(command)
return command
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
github.com/pingcap/tidb-tools v4.0.0-beta+incompatible
github.com/pingcap/tipb v0.0.0-20200212061130-c4d518eb1d60
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/common v0.4.1
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.5
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUW
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/VividCortex/ewma v1.1.1 h1:MnEK4VOv6n0RSY4vtRe3h11qjxL3+t0B8yOL8iMXdcM=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
Expand Down Expand Up @@ -614,6 +616,7 @@ google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRn
google.golang.org/grpc v1.25.1 h1:wdKvqQk7IttEw92GoRyKG2IDrUIpgpj6H6m81yfeMW0=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
gopkg.in/alecthomas/gometalinter.v2 v2.0.12/go.mod h1:NDRytsqEZyolNuAgTzJkZMkSQM7FIKyzVzGhjB/qfYo=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20180810215634-df19058c872c/go.mod h1:3HH7i1SgMqlzxCcBmUHW657sD4Kvv9sC3HpL3YukzwA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
99 changes: 49 additions & 50 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/google/btree"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/backup"
kvproto "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
Expand All @@ -36,7 +36,7 @@ import (

// ClientMgr manages connections needed by backup.
type ClientMgr interface {
GetBackupClient(ctx context.Context, storeID uint64) (backup.BackupClient, error)
GetBackupClient(ctx context.Context, storeID uint64) (kvproto.BackupClient, error)
GetPDClient() pd.Client
GetTiKV() tikv.Storage
GetLockResolver() *tikv.LockResolver
Expand All @@ -53,9 +53,9 @@ type Client struct {
mgr ClientMgr
clusterID uint64

backupMeta backup.BackupMeta
backupMeta kvproto.BackupMeta
storage storage.ExternalStorage
backend *backup.StorageBackend
backend *kvproto.StorageBackend
}

// NewBackupClient returns a new backup client
Expand Down Expand Up @@ -101,7 +101,7 @@ func (bc *Client) GetTS(ctx context.Context, duration time.Duration) (uint64, er
}

// SetStorage set ExternalStorage for client
func (bc *Client) SetStorage(ctx context.Context, backend *backup.StorageBackend, sendCreds bool) error {
func (bc *Client) SetStorage(ctx context.Context, backend *kvproto.StorageBackend, sendCreds bool) error {
var err error
bc.storage, err = storage.Create(ctx, backend, sendCreds)
if err != nil {
Expand Down Expand Up @@ -222,7 +222,7 @@ func BuildBackupRangeAndSchema(
return nil, nil, errors.Trace(err)
}

schema := backup.Schema{
schema := kvproto.Schema{
Db: dbData,
Table: tableData,
}
Expand Down Expand Up @@ -296,10 +296,7 @@ func GetBackupDDLJobs(dom *domain.Domain, lastBackupTS, backupTS uint64) ([]*mod
func (bc *Client) BackupRanges(
ctx context.Context,
ranges []Range,
lastBackupTS uint64,
backupTS uint64,
rateLimit uint64,
concurrency uint32,
req kvproto.BackupRequest,
updateCh chan<- struct{},
) error {
start := time.Now()
Expand All @@ -313,8 +310,8 @@ func (bc *Client) BackupRanges(
defer cancel()
go func() {
for _, r := range ranges {
err := bc.backupRange(
ctx, r.StartKey, r.EndKey, lastBackupTS, backupTS, rateLimit, concurrency, updateCh)
err := bc.BackupRange(
ctx, r.StartKey, r.EndKey, req, updateCh)
if err != nil {
errCh <- err
return
Expand All @@ -329,7 +326,7 @@ func (bc *Client) BackupRanges(

finished := false
for {
err := CheckGCSafepoint(ctx, bc.mgr.GetPDClient(), backupTS)
err := CheckGCSafepoint(ctx, bc.mgr.GetPDClient(), req.EndVersion)
if err != nil {
log.Error("check GC safepoint failed", zap.Error(err))
return err
Expand All @@ -353,14 +350,11 @@ func (bc *Client) BackupRanges(
}
}

// backupRange make a backup of the given key range.
func (bc *Client) backupRange(
// BackupRange make a backup of the given key range.
func (bc *Client) BackupRange(
ctx context.Context,
startKey, endKey []byte,
lastBackupTS uint64,
backupTS uint64,
rateLimit uint64,
concurrency uint32,
req kvproto.BackupRequest,
updateCh chan<- struct{},
) (err error) {
start := time.Now()
Expand All @@ -377,8 +371,8 @@ func (bc *Client) backupRange(
log.Info("backup started",
zap.Binary("StartKey", startKey),
zap.Binary("EndKey", endKey),
zap.Uint64("RateLimit", rateLimit),
zap.Uint32("Concurrency", concurrency))
zap.Uint64("RateLimit", req.RateLimit),
zap.Uint32("Concurrency", req.Concurrency))
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -388,16 +382,11 @@ func (bc *Client) backupRange(
return errors.Trace(err)
}

req := backup.BackupRequest{
ClusterId: bc.clusterID,
StartKey: startKey,
EndKey: endKey,
StartVersion: lastBackupTS,
EndVersion: backupTS,
StorageBackend: bc.backend,
RateLimit: rateLimit,
Concurrency: concurrency,
}
req.ClusterId = bc.clusterID
req.StartKey = startKey
req.EndKey = endKey
req.StorageBackend = bc.backend

push := newPushDown(ctx, bc.mgr, len(allStores))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems you need to set the IsRawKv field of backupMeta below here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var results RangeTree
Expand All @@ -410,17 +399,27 @@ func (bc *Client) backupRange(
// Find and backup remaining ranges.
// TODO: test fine grained backup.
err = bc.fineGrainedBackup(
ctx, startKey, endKey, lastBackupTS,
backupTS, rateLimit, concurrency, results, updateCh)
ctx, startKey, endKey, req.StartVersion,
req.EndVersion, req.RateLimit, req.Concurrency, results, updateCh)
if err != nil {
return err
}

bc.backupMeta.StartVersion = lastBackupTS
bc.backupMeta.EndVersion = backupTS
log.Info("backup time range",
zap.Reflect("StartVersion", lastBackupTS),
zap.Reflect("EndVersion", backupTS))
bc.backupMeta.StartVersion = req.StartVersion
bc.backupMeta.EndVersion = req.EndVersion
bc.backupMeta.IsRawKv = req.IsRawKv
if req.IsRawKv {
bc.backupMeta.RawRanges = append(bc.backupMeta.RawRanges,
&kvproto.RawRange{StartKey: startKey, EndKey: endKey, Cf: req.Cf})
log.Info("backup raw ranges",
zap.ByteString("startKey", startKey),
zap.ByteString("endKey", endKey),
zap.String("cf", req.Cf))
} else {
log.Info("backup time range",
zap.Reflect("StartVersion", req.StartVersion),
zap.Reflect("EndVersion", req.EndVersion))
}

results.tree.Ascend(func(i btree.Item) bool {
r := i.(*Range)
Expand Down Expand Up @@ -479,7 +478,7 @@ func (bc *Client) fineGrainedBackup(
}
log.Info("start fine grained backup", zap.Int("incomplete", len(incomplete)))
// Step2, retry backup on incomplete range
respCh := make(chan *backup.BackupResponse, 4)
respCh := make(chan *kvproto.BackupResponse, 4)
errCh := make(chan error, 4)
retry := make(chan Range, 4)

Expand Down Expand Up @@ -566,15 +565,15 @@ func onBackupResponse(
bo *tikv.Backoffer,
backupTS uint64,
lockResolver *tikv.LockResolver,
resp *backup.BackupResponse,
) (*backup.BackupResponse, int, error) {
resp *kvproto.BackupResponse,
) (*kvproto.BackupResponse, int, error) {
log.Debug("onBackupResponse", zap.Reflect("resp", resp))
if resp.Error == nil {
return resp, 0, nil
}
backoffMs := 0
switch v := resp.Error.Detail.(type) {
case *backup.Error_KvError:
case *kvproto.Error_KvError:
if lockErr := v.KvError.Locked; lockErr != nil {
// Try to resolve lock.
log.Warn("backup occur kv error", zap.Reflect("error", v))
Expand All @@ -592,7 +591,7 @@ func onBackupResponse(
log.Error("unexpect kv error", zap.Reflect("KvError", v.KvError))
return nil, backoffMs, errors.Errorf("onBackupResponse error %v", v)

case *backup.Error_RegionError:
case *kvproto.Error_RegionError:
regionErr := v.RegionError
// Ignore following errors.
if !(regionErr.EpochNotMatch != nil ||
Expand All @@ -610,7 +609,7 @@ func onBackupResponse(
// TODO: a better backoff.
backoffMs = 1000 /* 1s */
return nil, backoffMs, nil
case *backup.Error_ClusterIdError:
case *kvproto.Error_ClusterIdError:
log.Error("backup occur cluster ID error",
zap.Reflect("error", v))
err := errors.Errorf("%v", resp.Error)
Expand All @@ -631,7 +630,7 @@ func (bc *Client) handleFineGrained(
backupTS uint64,
rateLimit uint64,
concurrency uint32,
respCh chan<- *backup.BackupResponse,
respCh chan<- *kvproto.BackupResponse,
) (int, error) {
leader, pderr := bc.findRegionLeader(ctx, rg.StartKey)
if pderr != nil {
Expand All @@ -640,7 +639,7 @@ func (bc *Client) handleFineGrained(
storeID := leader.GetStoreId()
max := 0

req := backup.BackupRequest{
req := kvproto.BackupRequest{
ClusterId: bc.clusterID,
StartKey: rg.StartKey, // TODO: the range may cross region.
EndKey: rg.EndKey,
Expand All @@ -659,7 +658,7 @@ func (bc *Client) handleFineGrained(
err = SendBackup(
ctx, storeID, client, req,
// Handle responses with the same backoffer.
func(resp *backup.BackupResponse) error {
func(resp *kvproto.BackupResponse) error {
response, backoffMs, err1 :=
onBackupResponse(bo, backupTS, lockResolver, resp)
if err1 != nil {
Expand All @@ -684,9 +683,9 @@ func (bc *Client) handleFineGrained(
func SendBackup(
ctx context.Context,
storeID uint64,
client backup.BackupClient,
req backup.BackupRequest,
respFn func(*backup.BackupResponse) error,
client kvproto.BackupClient,
req kvproto.BackupRequest,
respFn func(*kvproto.BackupResponse) error,
) error {
log.Info("try backup", zap.Any("backup request", req))
ctx, cancel := context.WithCancel(ctx)
Expand Down
4 changes: 3 additions & 1 deletion pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,9 @@ func (mgr *Mgr) Close() {

// Gracefully shutdown domain so it does not affect other TiDB DDL.
// Must close domain before closing storage, otherwise it gets stuck forever.
mgr.dom.Close()
if mgr.dom != nil {
mgr.dom.Close()
}

atomic.StoreUint32(&tikv.ShuttingDown, 1)
mgr.storage.Close()
Expand Down
10 changes: 9 additions & 1 deletion pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/pingcap/errors"
kvproto "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-tools/pkg/filter"
Expand Down Expand Up @@ -131,8 +132,15 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
// Redirect to log if there is no log file to avoid unreadable output.
updateCh := utils.StartProgress(
ctx, cmdName, int64(approximateRegions), !cfg.LogProgress)

req := kvproto.BackupRequest{
StartVersion: cfg.LastBackupTS,
EndVersion: backupTS,
RateLimit: cfg.RateLimit,
Concurrency: cfg.Concurrency,
}
err = client.BackupRanges(
ctx, ranges, cfg.LastBackupTS, backupTS, cfg.RateLimit, cfg.Concurrency, updateCh)
ctx, ranges, req, updateCh)
if err != nil {
return err
}
Expand Down
Loading