diff --git a/Makefile b/Makefile index a62c2db96..eea680b74 100644 --- a/Makefile +++ b/Makefile @@ -28,6 +28,8 @@ build_for_integration_test: GO111MODULE=on go build -race -o bin/locker tests/br_key_locked/*.go # build gc GO111MODULE=on go build -race -o bin/gc tests/br_z_gc_safepoint/*.go + # build rawkv client + GO111MODULE=on go build -race -o bin/rawkv tests/br_rawkv/*.go test: GO111MODULE=on go test -race -tags leak ./... diff --git a/cmd/backup.go b/cmd/backup.go index 8ae45270c..a0a6bcecb 100644 --- a/cmd/backup.go +++ b/cmd/backup.go @@ -18,6 +18,14 @@ func runBackupCommand(command *cobra.Command, cmdName string) error { return task.RunBackup(GetDefaultContext(), tidbGlue, cmdName, &cfg) } +func runBackupRawCommand(command *cobra.Command, cmdName string) error { + cfg := task.BackupRawConfig{Config: task.Config{LogProgress: HasLogFile()}} + if err := cfg.ParseFromFlags(command.Flags()); err != nil { + return err + } + return task.RunBackupRaw(GetDefaultContext(), tidbGlue, cmdName, &cfg) +} + // NewBackupCommand return a full backup subcommand. func NewBackupCommand() *cobra.Command { command := &cobra.Command{ @@ -43,6 +51,7 @@ func NewBackupCommand() *cobra.Command { newFullBackupCommand(), newDbBackupCommand(), newTableBackupCommand(), + newRawBackupCommand(), ) task.DefineBackupFlags(command.PersistentFlags()) @@ -87,3 +96,17 @@ func newTableBackupCommand() *cobra.Command { task.DefineTableFlags(command) return command } + +// newRawBackupCommand return a raw kv range backup subcommand. +func newRawBackupCommand() *cobra.Command { + command := &cobra.Command{ + Use: "raw", + Short: "backup a raw kv range from TiKV cluster", + RunE: func(command *cobra.Command, _ []string) error { + return runBackupRawCommand(command, "Raw backup") + }, + } + + task.DefineRawBackupFlags(command) + return command +} diff --git a/go.mod b/go.mod index 6720e3602..ebad44174 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( github.com/pingcap/tidb-tools v4.0.0-beta+incompatible github.com/pingcap/tipb v0.0.0-20200212061130-c4d518eb1d60 github.com/prometheus/client_golang v1.0.0 + github.com/prometheus/common v0.4.1 github.com/sirupsen/logrus v1.4.2 github.com/spf13/cobra v0.0.5 github.com/spf13/pflag v1.0.5 diff --git a/go.sum b/go.sum index ca889933a..26526840e 100644 --- a/go.sum +++ b/go.sum @@ -26,7 +26,9 @@ github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUW github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/VividCortex/ewma v1.1.1 h1:MnEK4VOv6n0RSY4vtRe3h11qjxL3+t0B8yOL8iMXdcM= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= @@ -614,6 +616,7 @@ google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRn google.golang.org/grpc v1.25.1 h1:wdKvqQk7IttEw92GoRyKG2IDrUIpgpj6H6m81yfeMW0= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= gopkg.in/alecthomas/gometalinter.v2 v2.0.12/go.mod h1:NDRytsqEZyolNuAgTzJkZMkSQM7FIKyzVzGhjB/qfYo= +gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20180810215634-df19058c872c/go.mod h1:3HH7i1SgMqlzxCcBmUHW657sD4Kvv9sC3HpL3YukzwA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 6d6eff033..fb2960962 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -11,7 +11,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/google/btree" "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/backup" + kvproto "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/pingcap/parser/model" @@ -36,7 +36,7 @@ import ( // ClientMgr manages connections needed by backup. type ClientMgr interface { - GetBackupClient(ctx context.Context, storeID uint64) (backup.BackupClient, error) + GetBackupClient(ctx context.Context, storeID uint64) (kvproto.BackupClient, error) GetPDClient() pd.Client GetTiKV() tikv.Storage GetLockResolver() *tikv.LockResolver @@ -53,9 +53,9 @@ type Client struct { mgr ClientMgr clusterID uint64 - backupMeta backup.BackupMeta + backupMeta kvproto.BackupMeta storage storage.ExternalStorage - backend *backup.StorageBackend + backend *kvproto.StorageBackend } // NewBackupClient returns a new backup client @@ -101,7 +101,7 @@ func (bc *Client) GetTS(ctx context.Context, duration time.Duration) (uint64, er } // SetStorage set ExternalStorage for client -func (bc *Client) SetStorage(ctx context.Context, backend *backup.StorageBackend, sendCreds bool) error { +func (bc *Client) SetStorage(ctx context.Context, backend *kvproto.StorageBackend, sendCreds bool) error { var err error bc.storage, err = storage.Create(ctx, backend, sendCreds) if err != nil { @@ -222,7 +222,7 @@ func BuildBackupRangeAndSchema( return nil, nil, errors.Trace(err) } - schema := backup.Schema{ + schema := kvproto.Schema{ Db: dbData, Table: tableData, } @@ -296,10 +296,7 @@ func GetBackupDDLJobs(dom *domain.Domain, lastBackupTS, backupTS uint64) ([]*mod func (bc *Client) BackupRanges( ctx context.Context, ranges []Range, - lastBackupTS uint64, - backupTS uint64, - rateLimit uint64, - concurrency uint32, + req kvproto.BackupRequest, updateCh chan<- struct{}, ) error { start := time.Now() @@ -313,8 +310,8 @@ func (bc *Client) BackupRanges( defer cancel() go func() { for _, r := range ranges { - err := bc.backupRange( - ctx, r.StartKey, r.EndKey, lastBackupTS, backupTS, rateLimit, concurrency, updateCh) + err := bc.BackupRange( + ctx, r.StartKey, r.EndKey, req, updateCh) if err != nil { errCh <- err return @@ -329,7 +326,7 @@ func (bc *Client) BackupRanges( finished := false for { - err := CheckGCSafepoint(ctx, bc.mgr.GetPDClient(), backupTS) + err := CheckGCSafepoint(ctx, bc.mgr.GetPDClient(), req.EndVersion) if err != nil { log.Error("check GC safepoint failed", zap.Error(err)) return err @@ -353,14 +350,11 @@ func (bc *Client) BackupRanges( } } -// backupRange make a backup of the given key range. -func (bc *Client) backupRange( +// BackupRange make a backup of the given key range. +func (bc *Client) BackupRange( ctx context.Context, startKey, endKey []byte, - lastBackupTS uint64, - backupTS uint64, - rateLimit uint64, - concurrency uint32, + req kvproto.BackupRequest, updateCh chan<- struct{}, ) (err error) { start := time.Now() @@ -377,8 +371,8 @@ func (bc *Client) backupRange( log.Info("backup started", zap.Binary("StartKey", startKey), zap.Binary("EndKey", endKey), - zap.Uint64("RateLimit", rateLimit), - zap.Uint32("Concurrency", concurrency)) + zap.Uint64("RateLimit", req.RateLimit), + zap.Uint32("Concurrency", req.Concurrency)) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -388,16 +382,11 @@ func (bc *Client) backupRange( return errors.Trace(err) } - req := backup.BackupRequest{ - ClusterId: bc.clusterID, - StartKey: startKey, - EndKey: endKey, - StartVersion: lastBackupTS, - EndVersion: backupTS, - StorageBackend: bc.backend, - RateLimit: rateLimit, - Concurrency: concurrency, - } + req.ClusterId = bc.clusterID + req.StartKey = startKey + req.EndKey = endKey + req.StorageBackend = bc.backend + push := newPushDown(ctx, bc.mgr, len(allStores)) var results RangeTree @@ -410,17 +399,27 @@ func (bc *Client) backupRange( // Find and backup remaining ranges. // TODO: test fine grained backup. err = bc.fineGrainedBackup( - ctx, startKey, endKey, lastBackupTS, - backupTS, rateLimit, concurrency, results, updateCh) + ctx, startKey, endKey, req.StartVersion, + req.EndVersion, req.RateLimit, req.Concurrency, results, updateCh) if err != nil { return err } - bc.backupMeta.StartVersion = lastBackupTS - bc.backupMeta.EndVersion = backupTS - log.Info("backup time range", - zap.Reflect("StartVersion", lastBackupTS), - zap.Reflect("EndVersion", backupTS)) + bc.backupMeta.StartVersion = req.StartVersion + bc.backupMeta.EndVersion = req.EndVersion + bc.backupMeta.IsRawKv = req.IsRawKv + if req.IsRawKv { + bc.backupMeta.RawRanges = append(bc.backupMeta.RawRanges, + &kvproto.RawRange{StartKey: startKey, EndKey: endKey, Cf: req.Cf}) + log.Info("backup raw ranges", + zap.ByteString("startKey", startKey), + zap.ByteString("endKey", endKey), + zap.String("cf", req.Cf)) + } else { + log.Info("backup time range", + zap.Reflect("StartVersion", req.StartVersion), + zap.Reflect("EndVersion", req.EndVersion)) + } results.tree.Ascend(func(i btree.Item) bool { r := i.(*Range) @@ -479,7 +478,7 @@ func (bc *Client) fineGrainedBackup( } log.Info("start fine grained backup", zap.Int("incomplete", len(incomplete))) // Step2, retry backup on incomplete range - respCh := make(chan *backup.BackupResponse, 4) + respCh := make(chan *kvproto.BackupResponse, 4) errCh := make(chan error, 4) retry := make(chan Range, 4) @@ -566,15 +565,15 @@ func onBackupResponse( bo *tikv.Backoffer, backupTS uint64, lockResolver *tikv.LockResolver, - resp *backup.BackupResponse, -) (*backup.BackupResponse, int, error) { + resp *kvproto.BackupResponse, +) (*kvproto.BackupResponse, int, error) { log.Debug("onBackupResponse", zap.Reflect("resp", resp)) if resp.Error == nil { return resp, 0, nil } backoffMs := 0 switch v := resp.Error.Detail.(type) { - case *backup.Error_KvError: + case *kvproto.Error_KvError: if lockErr := v.KvError.Locked; lockErr != nil { // Try to resolve lock. log.Warn("backup occur kv error", zap.Reflect("error", v)) @@ -592,7 +591,7 @@ func onBackupResponse( log.Error("unexpect kv error", zap.Reflect("KvError", v.KvError)) return nil, backoffMs, errors.Errorf("onBackupResponse error %v", v) - case *backup.Error_RegionError: + case *kvproto.Error_RegionError: regionErr := v.RegionError // Ignore following errors. if !(regionErr.EpochNotMatch != nil || @@ -610,7 +609,7 @@ func onBackupResponse( // TODO: a better backoff. backoffMs = 1000 /* 1s */ return nil, backoffMs, nil - case *backup.Error_ClusterIdError: + case *kvproto.Error_ClusterIdError: log.Error("backup occur cluster ID error", zap.Reflect("error", v)) err := errors.Errorf("%v", resp.Error) @@ -631,7 +630,7 @@ func (bc *Client) handleFineGrained( backupTS uint64, rateLimit uint64, concurrency uint32, - respCh chan<- *backup.BackupResponse, + respCh chan<- *kvproto.BackupResponse, ) (int, error) { leader, pderr := bc.findRegionLeader(ctx, rg.StartKey) if pderr != nil { @@ -640,7 +639,7 @@ func (bc *Client) handleFineGrained( storeID := leader.GetStoreId() max := 0 - req := backup.BackupRequest{ + req := kvproto.BackupRequest{ ClusterId: bc.clusterID, StartKey: rg.StartKey, // TODO: the range may cross region. EndKey: rg.EndKey, @@ -659,7 +658,7 @@ func (bc *Client) handleFineGrained( err = SendBackup( ctx, storeID, client, req, // Handle responses with the same backoffer. - func(resp *backup.BackupResponse) error { + func(resp *kvproto.BackupResponse) error { response, backoffMs, err1 := onBackupResponse(bo, backupTS, lockResolver, resp) if err1 != nil { @@ -684,9 +683,9 @@ func (bc *Client) handleFineGrained( func SendBackup( ctx context.Context, storeID uint64, - client backup.BackupClient, - req backup.BackupRequest, - respFn func(*backup.BackupResponse) error, + client kvproto.BackupClient, + req kvproto.BackupRequest, + respFn func(*kvproto.BackupResponse) error, ) error { log.Info("try backup", zap.Any("backup request", req)) ctx, cancel := context.WithCancel(ctx) diff --git a/pkg/conn/conn.go b/pkg/conn/conn.go index d1f7858f6..6869c1199 100644 --- a/pkg/conn/conn.go +++ b/pkg/conn/conn.go @@ -379,7 +379,9 @@ func (mgr *Mgr) Close() { // Gracefully shutdown domain so it does not affect other TiDB DDL. // Must close domain before closing storage, otherwise it gets stuck forever. - mgr.dom.Close() + if mgr.dom != nil { + mgr.dom.Close() + } atomic.StoreUint32(&tikv.ShuttingDown, 1) mgr.storage.Close() diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 31594a08f..2d9468394 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -5,6 +5,7 @@ import ( "time" "github.com/pingcap/errors" + kvproto "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/log" "github.com/pingcap/parser/model" "github.com/pingcap/tidb-tools/pkg/filter" @@ -131,8 +132,15 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig // Redirect to log if there is no log file to avoid unreadable output. updateCh := utils.StartProgress( ctx, cmdName, int64(approximateRegions), !cfg.LogProgress) + + req := kvproto.BackupRequest{ + StartVersion: cfg.LastBackupTS, + EndVersion: backupTS, + RateLimit: cfg.RateLimit, + Concurrency: cfg.Concurrency, + } err = client.BackupRanges( - ctx, ranges, cfg.LastBackupTS, backupTS, cfg.RateLimit, cfg.Concurrency, updateCh) + ctx, ranges, req, updateCh) if err != nil { return err } diff --git a/pkg/task/backup_raw.go b/pkg/task/backup_raw.go new file mode 100644 index 000000000..51d5267a5 --- /dev/null +++ b/pkg/task/backup_raw.go @@ -0,0 +1,142 @@ +package task + +import ( + "bytes" + "context" + + "github.com/pingcap/errors" + kvproto "github.com/pingcap/kvproto/pkg/backup" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + + "github.com/pingcap/br/pkg/backup" + "github.com/pingcap/br/pkg/glue" + "github.com/pingcap/br/pkg/storage" + "github.com/pingcap/br/pkg/summary" + "github.com/pingcap/br/pkg/utils" +) + +const ( + flagKeyFormat = "format" + flagTiKVColumnFamily = "cf" + flagStartKey = "start" + flagEndKey = "end" +) + +// BackupRawConfig is the configuration specific for backup tasks. +type BackupRawConfig struct { + Config + + StartKey []byte `json:"start-key" toml:"start-key"` + EndKey []byte `json:"end-key" toml:"end-key"` + CF string `json:"cf" toml:"cf"` +} + +// DefineRawBackupFlags defines common flags for the backup command. +func DefineRawBackupFlags(command *cobra.Command) { + command.Flags().StringP(flagKeyFormat, "", "hex", "start/end key format, support raw|escaped|hex") + command.Flags().StringP(flagTiKVColumnFamily, "", "default", "backup specify cf, correspond to tikv cf") + command.Flags().StringP(flagStartKey, "", "", "backup raw kv start key, key is inclusive") + command.Flags().StringP(flagEndKey, "", "", "backup raw kv end key, key is exclusive") +} + +// ParseFromFlags parses the backup-related flags from the flag set. +func (cfg *BackupRawConfig) ParseFromFlags(flags *pflag.FlagSet) error { + format, err := flags.GetString(flagKeyFormat) + if err != nil { + return err + } + start, err := flags.GetString(flagStartKey) + if err != nil { + return err + } + cfg.StartKey, err = utils.ParseKey(format, start) + if err != nil { + return err + } + end, err := flags.GetString(flagEndKey) + if err != nil { + return err + } + cfg.EndKey, err = utils.ParseKey(format, end) + if err != nil { + return err + } + + if bytes.Compare(cfg.StartKey, cfg.EndKey) >= 0 { + return errors.New("endKey must be greater than startKey") + } + + cfg.CF, err = flags.GetString(flagTiKVColumnFamily) + if err != nil { + return err + } + if err = cfg.Config.ParseFromFlags(flags); err != nil { + return errors.Trace(err) + } + return nil +} + +// RunBackupRaw starts a backup task inside the current goroutine. +func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *BackupRawConfig) error { + ctx, cancel := context.WithCancel(c) + defer cancel() + + u, err := storage.ParseBackend(cfg.Storage, &cfg.BackendOptions) + if err != nil { + return err + } + mgr, err := newMgr(ctx, g, cfg.PD, cfg.TLS) + if err != nil { + return err + } + defer mgr.Close() + + client, err := backup.NewBackupClient(ctx, mgr) + if err != nil { + return err + } + if err = client.SetStorage(ctx, u, cfg.SendCreds); err != nil { + return err + } + + defer summary.Summary(cmdName) + + backupRange := backup.Range{StartKey: cfg.StartKey, EndKey: cfg.EndKey} + + // The number of regions need to backup + approximateRegions, err := mgr.GetRegionCount(ctx, backupRange.StartKey, backupRange.EndKey) + if err != nil { + return err + } + + summary.CollectInt("backup total regions", approximateRegions) + + // Backup + // Redirect to log if there is no log file to avoid unreadable output. + updateCh := utils.StartProgress( + ctx, cmdName, int64(approximateRegions), !cfg.LogProgress) + + req := kvproto.BackupRequest{ + StartVersion: 0, + EndVersion: 0, + RateLimit: cfg.RateLimit, + Concurrency: cfg.Concurrency, + IsRawKv: true, + Cf: cfg.CF, + } + + err = client.BackupRange(ctx, backupRange.StartKey, backupRange.EndKey, req, updateCh) + if err != nil { + return err + } + // Backup has finished + close(updateCh) + + // Checksum + err = client.SaveBackupMeta(ctx, nil) + if err != nil { + return err + } + return nil +} diff --git a/pkg/utils/key.go b/pkg/utils/key.go new file mode 100644 index 000000000..8ed1109b0 --- /dev/null +++ b/pkg/utils/key.go @@ -0,0 +1,70 @@ +package utils + +import ( + "bytes" + "encoding/hex" + "fmt" + "io" + "strings" + + "github.com/pingcap/errors" +) + +// ParseKey parse key by given format +func ParseKey(format, key string) ([]byte, error) { + switch format { + case "raw": + return []byte(key), nil + case "escaped": + return unescapedKey(key) + case "hex": + key, err := hex.DecodeString(key) + if err != nil { + return nil, errors.WithStack(err) + } + return key, nil + } + return nil, errors.New("unknown format") +} + +// Ref PD: https://github.com/pingcap/pd/blob/master/tools/pd-ctl/pdctl/command/region_command.go#L334 +func unescapedKey(text string) ([]byte, error) { + var buf []byte + r := bytes.NewBuffer([]byte(text)) + for { + c, err := r.ReadByte() + if err != nil { + if err != io.EOF { + return nil, errors.WithStack(err) + } + break + } + if c != '\\' { + buf = append(buf, c) + continue + } + n := r.Next(1) + if len(n) == 0 { + return nil, io.EOF + } + // See: https://golang.org/ref/spec#Rune_literals + if idx := strings.IndexByte(`abfnrtv\'"`, n[0]); idx != -1 { + buf = append(buf, []byte("\a\b\f\n\r\t\v\\'\"")[idx]) + continue + } + + switch n[0] { + case 'x': + fmt.Sscanf(string(r.Next(2)), "%02x", &c) + buf = append(buf, c) + default: + n = append(n, r.Next(2)...) + _, err := fmt.Sscanf(string(n), "%03o", &c) + if err != nil { + return nil, errors.WithStack(err) + } + buf = append(buf, c) + } + } + return buf, nil +} diff --git a/pkg/utils/key_test.go b/pkg/utils/key_test.go new file mode 100644 index 000000000..092962135 --- /dev/null +++ b/pkg/utils/key_test.go @@ -0,0 +1,32 @@ +package utils + +import ( + "encoding/hex" + + . "github.com/pingcap/check" +) + +type testKeySuite struct{} + +var _ = Suite(&testKeySuite{}) + +func (r *testKeySuite) TestParseKey(c *C) { + rawKey := "1234" + parsedKey, err := ParseKey("raw", rawKey) + c.Assert(err, IsNil) + c.Assert(parsedKey, BytesEquals, []byte(rawKey)) + + escapedKey := "\\a\\x1" + parsedKey, err = ParseKey("escaped", escapedKey) + c.Assert(err, IsNil) + c.Assert(parsedKey, BytesEquals, []byte("\a\x01")) + + hexKey := hex.EncodeToString([]byte("1234")) + parsedKey, err = ParseKey("hex", hexKey) + c.Assert(err, IsNil) + c.Assert(parsedKey, BytesEquals, []byte("1234")) + + _, err = ParseKey("notSupport", rawKey) + c.Assert(err, ErrorMatches, "*unknown format*") + +} diff --git a/tests/br_rawkv/client.go b/tests/br_rawkv/client.go new file mode 100644 index 000000000..bd13839f6 --- /dev/null +++ b/tests/br_rawkv/client.go @@ -0,0 +1,325 @@ +package main + +import ( + "bytes" + "encoding/hex" + "flag" + "fmt" + "hash/crc64" + "math/rand" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/store/tikv" + "github.com/prometheus/common/log" +) + +var ( + pdAddr = flag.String("pd", "127.0.0.1:2379", "Address of PD") + runMode = flag.String("mode", "", "Mode. One of 'rand-gen', 'checksum', 'scan' and 'diff'") + startKeyStr = flag.String("start-key", "", "Start key in hex") + endKeyStr = flag.String("end-key", "", "End key in hex") + keyMaxLen = flag.Int("key-max-len", 32, "Max length of keys for rand-gen mode") + concurrency = flag.Int("concurrency", 32, "Concurrency to run rand-gen") + duration = flag.Int("duration", 10, "duration(second) of rand-gen") +) + +func createClient(addr string) (*tikv.RawKVClient, error) { + cli, err := tikv.NewRawKVClient([]string{addr}, config.Security{}) + return cli, err +} + +func main() { + flag.Parse() + + startKey, err := hex.DecodeString(*startKeyStr) + if err != nil { + log.Fatalf("Invalid startKey: %v, err: %+v", startKeyStr, err) + } + endKey, err := hex.DecodeString(*endKeyStr) + if err != nil { + log.Fatalf("Invalid endKey: %v, err: %+v", endKeyStr, err) + } + if len(endKey) == 0 { + log.Fatal("Empty endKey is not supported yet") + } + + if *runMode == "test-rand-key" { + testRandKey(startKey, endKey, *keyMaxLen) + return + } + + client, err := createClient(*pdAddr) + if err != nil { + log.Fatalf("Failed to create client to %v, err: %+v", *pdAddr, err) + } + + switch *runMode { + case "rand-gen": + err = randGenWithDuration(client, startKey, endKey, *keyMaxLen, *concurrency, *duration) + case "checksum": + err = checksum(client, startKey, endKey) + case "scan": + err = scan(client, startKey, endKey) + case "delete": + err = deleteRange(client, startKey, endKey) + } + + if err != nil { + log.Fatalf("Error: %+v", err) + } +} + +func randGenWithDuration(client *tikv.RawKVClient, startKey, endKey []byte, + maxLen int, concurrency int, duration int) error { + var err error + ok := make(chan struct{}) + go func() { + err = randGen(client, startKey, endKey, maxLen, concurrency) + ok <- struct{}{} + }() + select { + case <-time.After(time.Second * time.Duration(duration)): + case <-ok: + } + return err +} + +func randGen(client *tikv.RawKVClient, startKey, endKey []byte, maxLen int, concurrency int) error { + log.Infof("Start rand-gen from %v to %v, maxLen %v", hex.EncodeToString(startKey), hex.EncodeToString(endKey), maxLen) + log.Infof("Rand-gen will keep running. Please Ctrl+C to stop manually.") + + // Cannot generate shorter key than commonPrefix + commonPrefixLen := 0 + for ; commonPrefixLen < len(startKey) && commonPrefixLen < len(endKey) && + startKey[commonPrefixLen] == endKey[commonPrefixLen]; commonPrefixLen++ { + continue + } + + if maxLen < commonPrefixLen { + return errors.Errorf("maxLen (%v) < commonPrefixLen (%v)", maxLen, commonPrefixLen) + } + + const batchSize = 32 + + errCh := make(chan error, concurrency) + for i := 0; i < concurrency; i++ { + go func() { + for { + keys := make([][]byte, 0, batchSize) + values := make([][]byte, 0, batchSize) + + for i := 0; i < batchSize; i++ { + key := randKey(startKey, endKey, maxLen) + keys = append(keys, key) + value := randValue() + values = append(values, value) + } + + err := client.BatchPut(keys, values) + if err != nil { + errCh <- errors.Trace(err) + } + } + }() + } + + err := <-errCh + if err != nil { + return errors.Trace(err) + } + + return nil +} + +func testRandKey(startKey, endKey []byte, maxLen int) { + for { + k := randKey(startKey, endKey, maxLen) + if bytes.Compare(k, startKey) < 0 || bytes.Compare(k, endKey) >= 0 { + panic(hex.EncodeToString(k)) + } + } +} + +func randKey(startKey, endKey []byte, maxLen int) []byte { +Retry: + for { // Regenerate on fail + result := make([]byte, 0, maxLen) + + upperUnbounded := false + lowerUnbounded := false + + for i := 0; i < maxLen; i++ { + upperBound := 256 + if !upperUnbounded { + if i >= len(endKey) { + // The generated key is the same as endKey which is invalid. Regenerate it. + continue Retry + } + upperBound = int(endKey[i]) + 1 + } + + lowerBound := 0 + if !lowerUnbounded { + if i >= len(startKey) { + lowerUnbounded = true + } else { + lowerBound = int(startKey[i]) + } + } + + if lowerUnbounded { + if rand.Intn(257) == 0 { + return result + } + } + + value := rand.Intn(upperBound - lowerBound) + value += lowerBound + + if value < upperBound-1 { + upperUnbounded = true + } + if value > lowerBound { + lowerUnbounded = true + } + + result = append(result, uint8(value)) + } + + return result + } +} + +func randValue() []byte { + result := make([]byte, 0, 512) + for i := 0; i < 512; i++ { + value := rand.Intn(257) + if value == 256 { + if i > 0 { + return result + } + value-- + } + result = append(result, uint8(value)) + } + return result +} + +func checksum(client *tikv.RawKVClient, startKey, endKey []byte) error { + log.Infof("Start checkcum on range %v to %v", hex.EncodeToString(startKey), hex.EncodeToString(endKey)) + + scanner := newRawKVScanner(client, startKey, endKey) + digest := crc64.New(crc64.MakeTable(crc64.ECMA)) + + var res uint64 + + for { + k, v, err := scanner.Next() + if err != nil { + return errors.Trace(err) + } + if len(k) == 0 { + break + } + _, _ = digest.Write(k) + _, _ = digest.Write(v) + res ^= digest.Sum64() + } + + fmt.Printf("Checksum result: %016x\n", res) + return nil +} + +func deleteRange(client *tikv.RawKVClient, startKey, endKey []byte) error { + log.Infof("Start delete data in range %v to %v", hex.EncodeToString(startKey), hex.EncodeToString(endKey)) + return client.DeleteRange(startKey, endKey) +} + +func scan(client *tikv.RawKVClient, startKey, endKey []byte) error { + log.Infof("Start scanning data in range %v to %v", hex.EncodeToString(startKey), hex.EncodeToString(endKey)) + + scanner := newRawKVScanner(client, startKey, endKey) + + var key []byte + for { + k, v, err := scanner.Next() + if err != nil { + return errors.Trace(err) + } + if len(k) == 0 { + break + } + fmt.Printf("key: %v, value: %v\n", hex.EncodeToString(k), hex.EncodeToString(v)) + if bytes.Compare(key, k) >= 0 { + log.Errorf("Scan result is not in order. "+ + "Previous key: %v, Current key: %v", + hex.EncodeToString(key), hex.EncodeToString(k)) + } + } + + log.Infof("Finished Scanning.") + return nil +} + +const defaultScanBatchSize = 128 + +type rawKVScanner struct { + client *tikv.RawKVClient + batchSize int + + currentKey []byte + endKey []byte + + bufferKeys [][]byte + bufferValues [][]byte + bufferCursor int + noMore bool +} + +func newRawKVScanner(client *tikv.RawKVClient, startKey, endKey []byte) *rawKVScanner { + return &rawKVScanner{ + client: client, + batchSize: defaultScanBatchSize, + + currentKey: startKey, + endKey: endKey, + + noMore: false, + } +} + +func (s *rawKVScanner) Next() ([]byte, []byte, error) { + if s.bufferCursor >= len(s.bufferKeys) { + if s.noMore { + return nil, nil, nil + } + + s.bufferCursor = 0 + + batchSize := s.batchSize + var err error + s.bufferKeys, s.bufferValues, err = s.client.Scan(s.currentKey, s.endKey, batchSize) + if err != nil { + return nil, nil, errors.Trace(err) + } + + if len(s.bufferKeys) < batchSize { + s.noMore = true + } + + if len(s.bufferKeys) == 0 { + return nil, nil, nil + } + + bufferKey := s.bufferKeys[len(s.bufferKeys)-1] + bufferKey = append(bufferKey, 0) + s.currentKey = bufferKey + } + + key := s.bufferKeys[s.bufferCursor] + value := s.bufferValues[s.bufferCursor] + s.bufferCursor++ + return key, value, nil +} diff --git a/tests/br_rawkv/run.sh b/tests/br_rawkv/run.sh new file mode 100644 index 000000000..a3f62311f --- /dev/null +++ b/tests/br_rawkv/run.sh @@ -0,0 +1,52 @@ +#!/bin/sh +# +# Copyright 2019 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu + +BACKUP_DIR="raw_backup" + +# generate raw kv randomly in range[start-key, end-key) in 10s +bin/rawkv --pd $PD_ADDR --mode rand-gen --start-key 31 --end-key 3130303030303030 --duration 10 + +# output checksum +bin/rawkv --pd $PD_ADDR --mode checksum --start-key 31 --end-key 3130303030303030 > /$TEST_DIR/checksum.out + +checksum_ori=$(cat /$TEST_DIR/checksum.out | grep result | awk '{print $3}') + +# backup rawkv +echo "backup start..." +run_br --pd $PD_ADDR backup raw -s "local://$TEST_DIR/$BACKUP_DIR" --start 31 --end 3130303030303030 --format hex --concurrency 4 + +# delete data in range[start-key, end-key) +bin/rawkv --pd $PD_ADDR --mode delete --start-key 31 --end-key 3130303030303030 + +# TODO: Finish check after restore ready +# restore rawkv +# echo "restore start..." +# run_br --pd $PD_ADDR restore raw -s "local://$TEST_DIR/$BACKUP_DIR" --start 31 --end 3130303030303030 --format hex --concurrency 4 + +# output checksum after restore +# bin/rawkv --pd $PD_ADDR --mode checksum --start-key 31 --end-key 3130303030303030 > /$TEST_DIR/checksum.out + +checksum_new=$(cat /$TEST_DIR/checksum.out | grep result | awk '{print $3}') + +if [ "$checksum_ori" == "$checksum_new" ];then + echo "TEST: [$TEST_NAME] successed!" +else + echo "TEST: [$TEST_NAME] failed!" + exit 1 +fi + +