From 194a4ff33ff151bb44795ff1ddff5f54d091b05f Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 2 Apr 2020 13:04:37 +0800 Subject: [PATCH] Merge master to Release 4.0 (#206) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * restore: merge tidb-tools/pkg/restore-util (#146) * restore-util: Implement split/scatter (#274) * implement split/scatter Signed-off-by: 5kbpers * init test Signed-off-by: 5kbpers * redesign output/input of the lib Signed-off-by: 5kbpers * update dependency Signed-off-by: 5kbpers * add commments and more tests Signed-off-by: 5kbpers * add ScanRegions interface to Client Signed-off-by: 5kbpers * fix potential data race Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * Apply suggestions from code review Co-Authored-By: kennytm * Update pkg/restore-util/client.go Co-Authored-By: kennytm * address comments Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * update dependency Signed-off-by: 5kbpers * resolve conflicts Signed-off-by: 5kbpers * fix prefix rewrite Signed-off-by: 5kbpers * add RewriteRule/skip failed scatter region/retry the SplitRegion Signed-off-by: 5kbpers * fix test Signed-off-by: 5kbpers * check if region has peer Signed-off-by: 5kbpers * more logs Signed-off-by: 5kbpers * restore-util: add split retry interval (#277) * reset dependencies to release-3.1 * add split retry interval Signed-off-by: 5kbpers * fix go.sum Signed-off-by: 5kbpers * restore-util: wait for scatter region sequentially (#279) * wait for scatter region sequentially Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * restore-util: add on split hook (#281) * restore-util: add on split hook Signed-off-by: Neil Shen * Nil check onSplit Co-Authored-By: kennytm * restore-util: fix returned new region is nil (#283) * restore-util: fix returned new region is nil Signed-off-by: 5kbpers * more logs Signed-off-by: 5kbpers * *: gofmt Signed-off-by: 5kbpers * Apply suggestions from code review Co-Authored-By: kennytm * fix log Signed-off-by: 5kbpers * restore-util: call onSplit on splitByRewriteRules (#285) Signed-off-by: Neil Shen * restore-util: fix overlapped error message (#293) * restore-util: fix overlapped error message Signed-off-by: 5kbpers * fix log message Signed-off-by: 5kbpers * reduce error trace Signed-off-by: 5kbpers * fix test Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * restore-util: log warning when cannot find matched rewrite rule (#299) * restore-util: add method to set placement rules and store labels (#301) * restore-util: add method to set placement rules and store labels Signed-off-by: disksing * minor fix Signed-off-by: disksing * address comment Signed-off-by: disksing * add GetPlacementRules Signed-off-by: disksing * fix test Signed-off-by: disksing * restore-util: support batch split (#300) * restore-util: support batch split Signed-off-by: 5kbpers * go fmt Signed-off-by: 5kbpers * Apply suggestions from code review Co-Authored-By: kennytm * address commits Signed-off-by: 5kbpers * Update pkg/restore-util/split.go Co-Authored-By: kennytm * add onSplit callback Signed-off-by: 5kbpers * fix test Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * restore-util: add upper bound time for waiting for scatter (#305) * restore: fix scatter regions failed Signed-off-by: 5kbpers * add log Signed-off-by: 5kbpers * stop waiting for scatter after 3min Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * restore-util: fix wrong url (#306) Signed-off-by: disksing * restore-util: add warning about unmatched table id (#313) * restore-util: support table partition Signed-off-by: 5kbpers * fix log Signed-off-by: 5kbpers * warn table id does not match Signed-off-by: 5kbpers * add unit tests Signed-off-by: 5kbpers * Apply suggestions from code review Co-Authored-By: Neil Shen * fix compile error Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * fix test Signed-off-by: 5kbpers Co-authored-by: Ian Co-authored-by: Neil Shen * *: prune tidb-tools Signed-off-by: Neil Shen * restore: address linters suggestions Signed-off-by: Neil Shen * restore: merge restoreutil into restore Signed-off-by: Neil Shen * address comment Signed-off-by: Neil Shen Co-authored-by: 5kbpers <20279863+5kbpers@users.noreply.github.com> Co-authored-by: kennytm Co-authored-by: disksing Co-authored-by: Ian * Fixed handling for a dbName that do not exist in the backup being restored (#148) * Fixed handling for a dbName that do not exist in the backup being restored * Fixed handling for a dbName that do not exist in the backup being restored * validate: fix debug meta test ci (#153) * validate: fix debug meta test ci * *: extracts runBackup/runRestore in cmd into pkg/task (#156) * *: extracts runBackup/runRestore in cmd into pkg/task Defines a "Config" structure to store the parsed flags. Use the "black-white-list" structure to define what tables/databases to backup/restore. * go.mod: update tidb to v4.0.0-beta * restore: fix restore summary log (#150) Co-authored-by: kennytm * restore: enhance error handling (#152) * restore: enhance error handling Signed-off-by: 5kbpers * unit test Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * fix region epoch error Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * remove `Restore*` Signed-off-by: 5kbpers * address lint Signed-off-by: 5kbpers * add debug log Signed-off-by: 5kbpers * Apply suggestions from code review Co-Authored-By: kennytm * Update pkg/restore/import.go Co-Authored-By: kennytm * fix retry error Signed-off-by: 5kbpers * handle RegionNotFound error Signed-off-by: 5kbpers Co-authored-by: Neil Shen Co-authored-by: kennytm * Incremental BR: support DDL (#155) * support backup&restore ddl Signed-off-by: 5kbpers * integration tests Signed-off-by: 5kbpers * update kvproto Signed-off-by: 5kbpers * fix integration tests Signed-off-by: 5kbpers * reduce cyclomatic complexity of `runRestore` Signed-off-by: 5kbpers * fix test Signed-off-by: 5kbpers * add unit test Signed-off-by: 5kbpers * fix tests Signed-off-by: 5kbpers * disable fast checksum in incremental br Signed-off-by: 5kbpers * fix no valid key error Signed-off-by: 5kbpers * address lint Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * Reduce TiDB dependencies (#158) * utils: exclude mock_cluster outside of unit test * utils: remove unused ResultSetToStringSlice() * *: abstract away dependencies of tidb/session into a Glue interface * *: fix hound lint * util,mock: move utils.MockCluster to mock.Cluster * restore: fix test build failure Co-authored-by: 3pointer * go.mod: update tidb (#168) Signed-off-by: Neil Shen * BR support TLS (#161) * *: support tls * move tikv.driver to glue * fix comments * upgrade golangci and prepare for go 1.14 (#171) Signed-off-by: Neil Shen * backup: add raw backup command (#101) * backup: add raw backup command * restore: speed up retry on not leader (#179) * tests: stable cluster start up Signed-off-by: Neil Shen * tests: fix unbound var Signed-off-by: Neil Shen * restore: speed retry on not leader Signed-off-by: Neil Shen * address comments Signed-off-by: Neil Shen * tests: add --cacert flag Signed-off-by: Neil Shen * make codecov green Signed-off-by: Neil Shen * address comments Signed-off-by: Neil Shen * conn, restore: paginate scan regions (#165) * conn, restore: paginate scan regions Signed-off-by: Neil Shen * tests: large timeout Signed-off-by: Neil Shen * Batch restore (#167) * *: unify Range and RangeTree Signed-off-by: Neil Shen * restore: split restore files into small batch Signed-off-by: Neil Shen * task: set default restore concurrency to 128 Signed-off-by: Neil Shen * restore: unused table worker pool Signed-off-by: Neil Shen * summary: sum up repeated duration and int Signed-off-by: Neil Shen * rtree: move rtree from utils to pkg Signed-off-by: Neil Shen * README, docker: add quick start (#181) * README, docker: add quick start Signed-off-by: Neil Shen * cmd: disable some TiDB log Signed-off-by: Neil Shen * docker: build go-ycsb automatically Signed-off-by: Neil Shen * cmd: add TODO about TiDB logs Signed-off-by: Neil Shen * *: update tidb dependency build with go1.14 (#176) * *: add license header (#182) * rtree: move checkFile into backup Signed-off-by: Neil Shen * *: add license header Signed-off-by: Neil Shen * Update LICENSE.md Co-Authored-By: kennytm Co-authored-by: kennytm Co-authored-by: 3pointer * conn: support not shutting down the storage when closing the connection (#185) Co-authored-by: 3pointer * conn: use GetDomain to avoid some TiDB breaking changes (#186) * conn: use GetDomain to avoid some TiDB breaking changes Signed-off-by: Neil Shen * minor usability improvement Signed-off-by: Neil Shen Co-authored-by: kennytm * fix check safepoint & unhide experimental features (#175) * backup: check safepoint for last backup ts Signed-off-by: 5kbpers * check lastbackupts > 0 Signed-off-by: 5kbpers * unhide experimental features Signed-off-by: 5kbpers * address comment Signed-off-by: 5kbpers * Update tests/br_z_gc_safepoint/run.sh Co-Authored-By: kennytm Co-authored-by: kennytm * support backupts (#172) * support backupts * address comment * address comment * fix space * *: update pd deps to v4 (#184) Co-authored-by: 3pointer * restore: support online restore (#114) Signed-off-by: disksing * metrics: add grafana scripts (#140) * add grafana scripts * fix Co-authored-by: 3pointer Co-authored-by: glorv Co-authored-by: kennytm * filter out all TiFlash nodes when retrieving lists of stores from PD (#187) * conn: ignore nodes with label engine=tiflash * conn: disallow TiFlash on restore, only skip TiFlash on backup * Create integration test for S3 storage (#174) * Fix summary log (#191) * *: fix restore summary log after restore logic changed to files * fix * fix * fix Co-authored-by: kennytm * Implement Raw Restore (#104) * Update kvproto * Implement raw restore * fix build * Set range for file importer Signed-off-by: MyonKeminta * Remove unnecessary comments Signed-off-by: MyonKeminta * check cf and support multi ranges in BackupMeta Signed-off-by: MyonKeminta * Check files' cf; address comments * adjust structure to keep consistent with master * Fix build Signed-off-by: MyonKeminta * Fix build and make check, avoid accessing TiDB in rawkv mode * Fix test Signed-off-by: MyonKeminta * Fix tests Signed-off-by: MyonKeminta * Fix broken logic after merging master * Update pkg/task/restore_raw.go Co-Authored-By: Neil Shen * Address comments * Address comments * Mark raw restore as experimental * Fix build * Address comments * test: Add check for deleting data and partial backup * Fix build * Add license header * fix ci * fix ci Co-authored-by: MyonKeminta Co-authored-by: 3pointer Co-authored-by: Neil Shen Co-authored-by: pingcap-github-bot * restore: remove tiflash replica before restore (#194) * restore: remove tiflash replica before restore Signed-off-by: 5kbpers * rename errSplit variable Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * check replica count by region info Signed-off-by: 5kbpers * cleanup Signed-off-by: 5kbpers * save tiflash replica count to backupmeta Signed-off-by: 5kbpers * fix save crcxor Signed-off-by: 5kbpers * fix decode the key of placement rule Signed-off-by: 5kbpers * address lint Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * close domain after restoring tiflash-replica Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * address comments Signed-off-by: 5kbpers * Update pkg/task/restore.go Co-Authored-By: 3pointer Co-authored-by: 3pointer * summary: put summary log at last (#197) * summary: put summary log at last * fix switch sql * *: abstract the progress channel (updateCh) into the glue package (#196) * *: abstract the progress channel (updateCh) into the glue package * restore: fix crash in truncateTS() when the bound is unlimited * task: fix comment Co-authored-by: Ian * *: refline logs (#189) * tests: disable TLS test (#204) Signed-off-by: Neil Shen * *: add S3 quick start and few enhancement of log (#202) * README, docker: add quick start about S3 storage Signed-off-by: Neil Shen * pkg/summary: make sure to output correct summary Signed-off-by: Neil Shen * cmd, tests: log to terminal if BR_LOG_TO_TERM is set Signed-off-by: Neil Shen * Update pkg/task/common.go Co-Authored-By: kennytm * address comments Signed-off-by: Neil Shen * address comments Signed-off-by: Neil Shen * tests: cat log if br fails Signed-off-by: Neil Shen Co-authored-by: kennytm * restore: add error field to `DownloadResponse` (#195) * restore: add error field to `DownloadResponse` Signed-off-by: 5kbpers * restore: populate restore cancel error (#207) Signed-off-by: Neil Shen Co-authored-by: kennytm * enhance usability of br (#208) * silenceUsage only when parse cmd flags failed * udpate tidb Co-authored-by: kennytm * task: do not run checksum if restore failed (#209) * fix incremental bug in llroad test (#199) * restore: filter same table ddl * *: do not return error when backup/restore data is empty * fix create database double during incremental restore * add tests * fix ci * address comment * add skip create sqls (#211) * Revert "tests: disable TLS test (#204)" (#218) This reverts commit e168a60288ade404a97b47147dbc64ec0b63b9e6. * doc: add `minio` to dependence list. (#221) The README of test omitted `minio` in the dependence list, which is needed for run the integration test. Co-authored-by: Neil Shen * move waiting reject stores in import file (#222) * move wait rejectstores into import files * restore: use new table id to search placementRules * Update pkg/restore/import.go Co-Authored-By: Neil Shen * Update pkg/restore/import.go Co-Authored-By: kennytm * fix ci Co-authored-by: Neil Shen Co-authored-by: kennytm * Max index length (#220) * restore: set max-index-length to max * restore:add max-index-length params * address comment * address comment * glue: create schema/table directly with info (#216) * glue: create schema/table directly with info * go.mod: change to use the master version * gluetidb: fix failure to create schema * gluetidb: exclude non-public indices when restoring * go.mod: removed unused replace Co-authored-by: 3pointer Co-authored-by: Neil Shen Co-authored-by: 5kbpers <20279863+5kbpers@users.noreply.github.com> Co-authored-by: kennytm Co-authored-by: disksing Co-authored-by: Ian Co-authored-by: Kolbe Kegel Co-authored-by: WangXiangUSTC Co-authored-by: glorv Co-authored-by: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Co-authored-by: MyonKeminta Co-authored-by: pingcap-github-bot Co-authored-by: 庄天翼 Co-authored-by: 山岚 <36239017+YuJuncen@users.noreply.github.com> --- README.md | 16 ++- cmd/backup.go | 4 +- cmd/cmd.go | 28 ++++-- cmd/restore.go | 5 +- docker-compose.yaml | 26 ++++- docker/Dockerfile | 9 +- docker/minio.env | 6 ++ go.mod | 8 +- go.sum | 23 +++-- main.go | 2 +- pkg/backup/client.go | 22 ++++- pkg/backup/push.go | 5 +- pkg/backup/schema.go | 7 +- pkg/backup/schema_test.go | 35 +++++-- pkg/glue/glue.go | 17 +++- pkg/gluetidb/glue.go | 41 +++++--- pkg/gluetikv/glue.go | 22 +++++ pkg/restore/backoff.go | 13 +-- pkg/restore/backoff_test.go | 6 +- pkg/restore/client.go | 57 +++++++---- pkg/restore/client_test.go | 4 + pkg/restore/db.go | 59 ++++------- pkg/restore/import.go | 58 ++++++----- pkg/restore/split.go | 70 ------------- pkg/restore/split_test.go | 2 +- pkg/restore/util.go | 114 ++++++++++++---------- pkg/summary/collector.go | 32 +++++- pkg/summary/collector_test.go | 1 + pkg/summary/summary.go | 5 + pkg/task/backup.go | 16 ++- pkg/task/backup_raw.go | 7 +- pkg/task/common.go | 2 +- pkg/task/restore.go | 103 ++++++++++++++----- pkg/task/restore_raw.go | 18 ++-- tests/README.md | 3 +- tests/_utils/run_services | 7 +- tests/br_alter_pk_server/config/tidb.toml | 8 ++ tests/br_alter_pk_server/config/tikv.toml | 14 +++ tests/br_alter_pk_server/run.sh | 42 ++++++++ tests/br_db_skip/run.sh | 72 ++++++++++++++ tests/br_full_ddl/run.sh | 5 +- tests/br_full_index/run.sh | 5 +- tests/br_incremental_only_ddl/run.sh | 72 ++++++++++++++ tests/br_incremental_same_table/run.sh | 86 ++++++++++++++++ tests/br_move_backup/run.sh | 9 ++ tests/run.sh | 1 + 46 files changed, 836 insertions(+), 331 deletions(-) create mode 100644 docker/minio.env create mode 100644 tests/br_alter_pk_server/config/tidb.toml create mode 100644 tests/br_alter_pk_server/config/tikv.toml create mode 100755 tests/br_alter_pk_server/run.sh create mode 100755 tests/br_db_skip/run.sh create mode 100755 tests/br_incremental_only_ddl/run.sh create mode 100755 tests/br_incremental_same_table/run.sh diff --git a/README.md b/README.md index 6207d98eb..408b09749 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ go-ycsb load mysql -p workload=core \ mysql -uroot -htidb -P4000 -E -e "SELECT COUNT(*) FROM test.usertable" # Build BR and backup! -make release && \ +make build && \ bin/br backup full --pd pd0:2379 --storage "local:///data/backup/full" \ --log-file "/logs/br_backup.log" @@ -69,6 +69,20 @@ bin/br restore full --pd pd0:2379 --storage "local:///data/backup/full" \ # How many rows do we get again? Expected to be 100000 rows. mysql -uroot -htidb -P4000 -E -e "SELECT COUNT(*) FROM test.usertable" + +# Test S3 compatible storage (MinIO). +# Create a bucket to save backup by mc (a MinIO Client). +mc config host add minio $S3_ENDPOINT $MINIO_ACCESS_KEY $MINIO_SECRET_KEY && \ +mc mb minio/mybucket + +# Backup to S3 compatible storage. +bin/br backup full --pd pd0:2379 --storage "s3://mybucket/full" \ + --s3.endpoint="$S3_ENDPOINT" + +# Drop database and restore! +mysql -uroot -htidb -P4000 -E -e "DROP DATABASE test; SHOW DATABASES;" && \ +bin/br restore full --pd pd0:2379 --storage "s3://mybucket/full" \ + --s3.endpoint="$S3_ENDPOINT" ``` ## Contributing diff --git a/cmd/backup.go b/cmd/backup.go index 3aed2147f..d37229e0a 100644 --- a/cmd/backup.go +++ b/cmd/backup.go @@ -16,6 +16,7 @@ import ( func runBackupCommand(command *cobra.Command, cmdName string) error { cfg := task.BackupConfig{Config: task.Config{LogProgress: HasLogFile()}} if err := cfg.ParseFromFlags(command.Flags()); err != nil { + command.SilenceUsage = false return err } return task.RunBackup(GetDefaultContext(), tidbGlue, cmdName, &cfg) @@ -24,6 +25,7 @@ func runBackupCommand(command *cobra.Command, cmdName string) error { func runBackupRawCommand(command *cobra.Command, cmdName string) error { cfg := task.RawKvConfig{Config: task.Config{LogProgress: HasLogFile()}} if err := cfg.ParseFromFlags(command.Flags()); err != nil { + command.SilenceUsage = false return err } return task.RunBackupRaw(GetDefaultContext(), gluetikv.Glue{}, cmdName, &cfg) @@ -34,7 +36,7 @@ func NewBackupCommand() *cobra.Command { command := &cobra.Command{ Use: "backup", Short: "backup a TiDB/TiKV cluster", - SilenceUsage: false, + SilenceUsage: true, PersistentPreRunE: func(c *cobra.Command, args []string) error { if err := Init(c); err != nil { return err diff --git a/cmd/cmd.go b/cmd/cmd.go index 5b2801894..87a8aadc9 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -6,8 +6,11 @@ import ( "context" "net/http" "net/http/pprof" + "os" + "path/filepath" "sync" "sync/atomic" + "time" "github.com/pingcap/log" "github.com/pingcap/tidb/util/logutil" @@ -16,15 +19,17 @@ import ( "go.uber.org/zap" "github.com/pingcap/br/pkg/gluetidb" + "github.com/pingcap/br/pkg/summary" "github.com/pingcap/br/pkg/task" "github.com/pingcap/br/pkg/utils" ) var ( - initOnce = sync.Once{} - defaultContext context.Context - hasLogFile uint64 - tidbGlue = gluetidb.Glue{} + initOnce = sync.Once{} + defaultContext context.Context + hasLogFile uint64 + tidbGlue = gluetidb.Glue{} + envLogToTermKey = "BR_LOG_TO_TERM" ) const ( @@ -41,6 +46,10 @@ const ( flagVersionShort = "V" ) +func timestampLogFileName() string { + return filepath.Join(os.TempDir(), "br.log."+time.Now().Format(time.RFC3339)) +} + // AddFlags adds flags to the given cmd. func AddFlags(cmd *cobra.Command) { cmd.Version = utils.BRInfo() @@ -49,8 +58,8 @@ func AddFlags(cmd *cobra.Command) { cmd.PersistentFlags().StringP(FlagLogLevel, "L", "info", "Set the log level") - cmd.PersistentFlags().String(FlagLogFile, "", - "Set the log file path. If not set, logs will output to stdout") + cmd.PersistentFlags().String(FlagLogFile, timestampLogFileName(), + "Set the log file path. If not set, logs will output to temp file") cmd.PersistentFlags().String(FlagStatusAddr, "", "Set the HTTP listening address for the status report service. Set to empty string to disable") task.DefineCommonFlags(cmd.PersistentFlags()) @@ -73,8 +82,15 @@ func Init(cmd *cobra.Command) (err error) { if err != nil { return } + _, outputLogToTerm := os.LookupEnv(envLogToTermKey) + if outputLogToTerm { + // Log to term if env `BR_LOG_TO_TERM` is set. + conf.File.Filename = "" + } if len(conf.File.Filename) != 0 { atomic.StoreUint64(&hasLogFile, 1) + summary.InitCollector(true) + cmd.Printf("Detial BR log in %s\n", conf.File.Filename) } lg, p, e := log.InitLogger(conf) if e != nil { diff --git a/cmd/restore.go b/cmd/restore.go index bc74bea84..1e894b4ee 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -15,6 +15,7 @@ import ( func runRestoreCommand(command *cobra.Command, cmdName string) error { cfg := task.RestoreConfig{Config: task.Config{LogProgress: HasLogFile()}} if err := cfg.ParseFromFlags(command.Flags()); err != nil { + command.SilenceUsage = false return err } return task.RunRestore(GetDefaultContext(), tidbGlue, cmdName, &cfg) @@ -25,6 +26,7 @@ func runRestoreRawCommand(command *cobra.Command, cmdName string) error { RawKvConfig: task.RawKvConfig{Config: task.Config{LogProgress: HasLogFile()}}, } if err := cfg.ParseFromFlags(command.Flags()); err != nil { + command.SilenceUsage = false return err } return task.RunRestoreRaw(GetDefaultContext(), gluetikv.Glue{}, cmdName, &cfg) @@ -33,6 +35,7 @@ func runRestoreRawCommand(command *cobra.Command, cmdName string) error { func runRestoreTiflashReplicaCommand(command *cobra.Command, cmdName string) error { cfg := task.RestoreConfig{Config: task.Config{LogProgress: HasLogFile()}} if err := cfg.ParseFromFlags(command.Flags()); err != nil { + command.SilenceUsage = false return err } @@ -44,7 +47,7 @@ func NewRestoreCommand() *cobra.Command { command := &cobra.Command{ Use: "restore", Short: "restore a TiDB/TiKV cluster", - SilenceUsage: false, + SilenceUsage: true, PersistentPreRunE: func(c *cobra.Command, args []string) error { if err := Init(c); err != nil { return err diff --git a/docker-compose.yaml b/docker-compose.yaml index 4d84c67fa..ab6360d6d 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,6 +1,6 @@ --- # Source: tidb-docker-compose/templates/docker-compose.yml -version: '2.1' +version: '3.2' services: control: @@ -10,11 +10,13 @@ services: dockerfile: ./docker/Dockerfile volumes: - ./docker/data:/data - - ./docker/logs:/logs + - ./docker/logs:/tmp command: -c "/usr/bin/tail -f /dev/null" depends_on: - "tidb" restart: on-failure + env_file: + - ./docker/minio.env pd0: image: pingcap/pd:latest @@ -64,6 +66,8 @@ services: # soft: 1000000 # hard: 1000000 restart: on-failure + env_file: + - ./docker/minio.env tikv1: image: pingcap/tikv:latest @@ -87,6 +91,8 @@ services: # soft: 1000000 # hard: 1000000 restart: on-failure + env_file: + - ./docker/minio.env tikv2: image: pingcap/tikv:latest @@ -110,6 +116,8 @@ services: # soft: 1000000 # hard: 1000000 restart: on-failure + env_file: + - ./docker/minio.env tikv3: image: pingcap/tikv:latest @@ -133,6 +141,8 @@ services: # soft: 1000000 # hard: 1000000 restart: on-failure + env_file: + - ./docker/minio.env tikv4: image: pingcap/tikv:latest @@ -156,6 +166,8 @@ services: # soft: 1000000 # hard: 1000000 restart: on-failure + env_file: + - ./docker/minio.env tidb: image: pingcap/tidb:latest @@ -185,6 +197,16 @@ services: # hard: 1000000 restart: on-failure + minio: + image: minio/minio + ports: + - 24927:24927 + volumes: + - ./docker/data/s3:/data/s3 + command: server --address=:24927 /data/s3 + env_file: + - ./docker/minio.env + tidb-vision: image: pingcap/tidb-vision:latest environment: diff --git a/docker/Dockerfile b/docker/Dockerfile index c93d22ab4..14c577fcf 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,11 +1,13 @@ -FROM golang:1.13.8-buster as builder - # For loading data to TiDB +FROM golang:1.13.8-buster as go-ycsb-builder WORKDIR /go/src/github.com/pingcap/ RUN git clone https://github.com/pingcap/go-ycsb.git && \ cd go-ycsb && \ make +# For operating minio S3 compatible storage +FROM minio/mc as mc-builder + FROM golang:1.13.8-buster RUN apt-get update && apt-get install -y --no-install-recommends \ @@ -19,6 +21,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ WORKDIR /go/src/github.com/pingcap/br COPY . . -COPY --from=builder /go/src/github.com/pingcap/go-ycsb/bin/go-ycsb /go/bin/go-ycsb +COPY --from=go-ycsb-builder /go/src/github.com/pingcap/go-ycsb/bin/go-ycsb /go/bin/go-ycsb +COPY --from=mc-builder /usr/bin/mc /usr/bin/mc ENTRYPOINT ["/bin/bash"] diff --git a/docker/minio.env b/docker/minio.env new file mode 100644 index 000000000..d865b2474 --- /dev/null +++ b/docker/minio.env @@ -0,0 +1,6 @@ +MINIO_ACCESS_KEY=brs3accesskey +MINIO_SECRET_KEY=brs3secretkey +MINIO_BROWSER=off +AWS_ACCESS_KEY_ID=brs3accesskey +AWS_SECRET_ACCESS_KEY=brs3secretkey +S3_ENDPOINT=http://minio:24927 diff --git a/go.mod b/go.mod index 94f4022f9..5d3251f08 100644 --- a/go.mod +++ b/go.mod @@ -21,11 +21,11 @@ require ( github.com/onsi/gomega v1.8.1 // indirect github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712 github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011 - github.com/pingcap/kvproto v0.0.0-20200317112120-78042b285b75 + github.com/pingcap/kvproto v0.0.0-20200330093347-98f910b71904 github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd - github.com/pingcap/parser v0.0.0-20200305120128-bde9faa0df84 + github.com/pingcap/parser v0.0.0-20200326020624-68d423641be5 github.com/pingcap/pd/v4 v4.0.0-beta.1.0.20200305072537-61d9f9cc35d3 - github.com/pingcap/tidb v1.1.0-beta.0.20200310133602-7c39e5e5e0bc + github.com/pingcap/tidb v0.0.0-20200401141416-959eca8f3a39 github.com/pingcap/tidb-tools v4.0.0-beta.1.0.20200306084441-875bd09aa3d5+incompatible github.com/pingcap/tipb v0.0.0-20200212061130-c4d518eb1d60 github.com/prometheus/client_golang v1.0.0 @@ -37,7 +37,7 @@ require ( github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738 go.opencensus.io v0.22.2 // indirect - go.uber.org/zap v1.14.0 + go.uber.org/zap v1.14.1 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 google.golang.org/api v0.14.0 google.golang.org/grpc v1.25.1 diff --git a/go.sum b/go.sum index 31fd50bcc..06ea01c73 100644 --- a/go.sum +++ b/go.sum @@ -363,16 +363,14 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17Xtb github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200214064158-62d31900d88e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20200221034943-a2aa1d1e20a8/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20200228095611-2cf9a243b8d5 h1:knEvP4R5v5b2T107/Q6VzB0C8/6T7NXB/V7Vl1FtQsg= -github.com/pingcap/kvproto v0.0.0-20200228095611-2cf9a243b8d5/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20200317112120-78042b285b75 h1:DB3NTM0ilba/6sW+vccdEnP10bVvrVunDwWvRa0hSKc= -github.com/pingcap/kvproto v0.0.0-20200317112120-78042b285b75/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20200330093347-98f910b71904 h1:pMFUXvhJ62hX8m0Q4RsL7L+hSW1mAMG26So5eFMoAtI= +github.com/pingcap/kvproto v0.0.0-20200330093347-98f910b71904/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd h1:CV3VsP3Z02MVtdpTMfEgRJ4T9NGgGTxdHpJerent7rM= github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= -github.com/pingcap/parser v0.0.0-20200305120128-bde9faa0df84 h1:u5FOwUw9muF8mBTZVV1dQhoAKiEo2Ci54CxN9XchEEY= -github.com/pingcap/parser v0.0.0-20200305120128-bde9faa0df84/go.mod h1:9v0Edh8IbgjGYW2ArJr19E+bvL8zKahsFp+ixWeId+4= +github.com/pingcap/parser v0.0.0-20200326020624-68d423641be5 h1:fXVqoeYfV+xI8K2he5NNv00c6YksrjeM6+vkNo1ZK2Q= +github.com/pingcap/parser v0.0.0-20200326020624-68d423641be5/go.mod h1:9v0Edh8IbgjGYW2ArJr19E+bvL8zKahsFp+ixWeId+4= github.com/pingcap/pd/v4 v4.0.0-beta.1.0.20200305072537-61d9f9cc35d3 h1:Yrp99FnjHAEuDrSBql2l0IqCtJX7KwJbTsD5hIArkvk= github.com/pingcap/pd/v4 v4.0.0-beta.1.0.20200305072537-61d9f9cc35d3/go.mod h1:25GfNw6+Jcr9kca5rtmTb4gKCJ4jOpow2zV2S9Dgafs= github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI= @@ -380,8 +378,8 @@ github.com/pingcap/sysutil v0.0.0-20200302022240-21c8c70d0ab1 h1:YUnUZ914SHFMsOS github.com/pingcap/sysutil v0.0.0-20200302022240-21c8c70d0ab1/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI= github.com/pingcap/sysutil v0.0.0-20200309085538-962fd285f3bb h1:bDbgLaNTRNK6Qw7KjvEqqfCQstY8WMEcXyXTU7yzYKg= github.com/pingcap/sysutil v0.0.0-20200309085538-962fd285f3bb/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI= -github.com/pingcap/tidb v1.1.0-beta.0.20200310133602-7c39e5e5e0bc h1:1aW3qTRJZjnosvXt1b75KL73b28XRJWBx6jtTtHsybg= -github.com/pingcap/tidb v1.1.0-beta.0.20200310133602-7c39e5e5e0bc/go.mod h1:WTmfs5zrUGMPw3Enn5FI3buzkU8BDuJ6BhsO/JC239U= +github.com/pingcap/tidb v0.0.0-20200401141416-959eca8f3a39 h1:nYRL69Qc4kuvp+tlDNB5wXjvDetX0J7g0DsW4RQxfXM= +github.com/pingcap/tidb v0.0.0-20200401141416-959eca8f3a39/go.mod h1:btnHsqUQvJnY18+OP2Z6MCRq1tX4B8JUCrmqctSKxOg= github.com/pingcap/tidb-tools v4.0.0-beta.1.0.20200306084441-875bd09aa3d5+incompatible h1:84F7MFMfdAYObrznvRslmVu43aoihrlL+7mMyMlOi0o= github.com/pingcap/tidb-tools v4.0.0-beta.1.0.20200306084441-875bd09aa3d5+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= @@ -501,6 +499,7 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yookoala/realpath v1.0.0 h1:7OA9pj4FZd+oZDsyvXWQvjn5oBdcHRTV44PpdMSuImQ= github.com/yookoala/realpath v1.0.0/go.mod h1:gJJMA9wuX7AcqLy1+ffPatSCySA1FQ2S8Ya9AIoYBpE= +github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= @@ -535,8 +534,8 @@ go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.12.0 h1:dySoUQPFBGj6xwjmBzageVL8jGi8uxc6bEmJQjA06bw= go.uber.org/zap v1.12.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= -go.uber.org/zap v1.14.0 h1:/pduUoebOeeJzTDFuoMgC6nRkiasr1sBCIEorly7m4o= -go.uber.org/zap v1.14.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= +go.uber.org/zap v1.14.1 h1:nYDKopTbvAPq/NrUVZwT15y2lpROBiLLyoRTbXOYWOo= +go.uber.org/zap v1.14.1/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -676,8 +675,8 @@ golang.org/x/tools v0.0.0-20191130070609-6e064ea0cf2d/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200301222351-066e0c02454c h1:FD7jysxM+EJqg5UYYy3XYDsAiUickFsn4UiaanJkf8c= golang.org/x/tools v0.0.0-20200301222351-066e0c02454c/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= -golang.org/x/tools v0.0.0-20200304193943-95d2e580d8eb h1:iKlO7ROJc6SttHKlxzwGytRtBUqX4VARrNTgP2YLX5M= -golang.org/x/tools v0.0.0-20200304193943-95d2e580d8eb/go.mod h1:o4KQGtdN14AW+yjsvvwRTJJuXz8XRtIHtEnmAXLyFUw= +golang.org/x/tools v0.0.0-20200325203130-f53864d0dba1 h1:odiryKYJy7CjdrZxhrcE1Z8L9+kGyGZOnfpuauvdCeU= +golang.org/x/tools v0.0.0-20200325203130-f53864d0dba1/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 h1:/atklqdjdhuosWIl6AIbOeHJjicWYPqR9bpxqxYG2pA= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/main.go b/main.go index 7b7cbfc97..4b369785f 100644 --- a/main.go +++ b/main.go @@ -41,7 +41,7 @@ func main() { Use: "br", Short: "br is a TiDB/TiKV cluster backup restore tool.", TraverseChildren: true, - SilenceUsage: false, + SilenceUsage: true, } cmd.AddFlags(rootCmd) cmd.SetDefaultContext(ctx) diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 9693b6b5f..72563096b 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -32,6 +32,7 @@ import ( "go.uber.org/zap" "github.com/pingcap/br/pkg/conn" + "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/storage" "github.com/pingcap/br/pkg/summary" @@ -223,6 +224,16 @@ func BuildBackupRangeAndSchema( zap.Stringer("table", tableInfo.Name), zap.Int64("AutoIncID", globalAutoID)) + // remove all non-public indices + n := 0 + for _, index := range tableInfo.Indices { + if index.State == model.StatePublic { + tableInfo.Indices[n] = index + n++ + } + } + tableInfo.Indices = tableInfo.Indices[:n] + if dbData == nil { dbData, err = json.Marshal(dbInfo) if err != nil { @@ -254,7 +265,8 @@ func BuildBackupRangeAndSchema( } if backupSchemas.Len() == 0 { - return nil, nil, errors.New("nothing to backup") + log.Info("nothing to backup") + return nil, nil, nil } return ranges, backupSchemas, nil } @@ -309,7 +321,7 @@ func (bc *Client) BackupRanges( ctx context.Context, ranges []rtree.Range, req kvproto.BackupRequest, - updateCh chan<- struct{}, + updateCh glue.Progress, ) error { start := time.Now() defer func() { @@ -374,7 +386,7 @@ func (bc *Client) BackupRange( ctx context.Context, startKey, endKey []byte, req kvproto.BackupRequest, - updateCh chan<- struct{}, + updateCh glue.Progress, ) (err error) { start := time.Now() defer func() { @@ -486,7 +498,7 @@ func (bc *Client) fineGrainedBackup( rateLimit uint64, concurrency uint32, rangeTree rtree.RangeTree, - updateCh chan<- struct{}, + updateCh glue.Progress, ) error { bo := tikv.NewBackoffer(ctx, backupFineGrainedMaxBackoff) for { @@ -561,7 +573,7 @@ func (bc *Client) fineGrainedBackup( rangeTree.Put(resp.StartKey, resp.EndKey, resp.Files) // Update progress - updateCh <- struct{}{} + updateCh.Inc() } } diff --git a/pkg/backup/push.go b/pkg/backup/push.go index 4aaffa7e2..d329f7088 100644 --- a/pkg/backup/push.go +++ b/pkg/backup/push.go @@ -12,6 +12,7 @@ import ( "github.com/pingcap/log" "go.uber.org/zap" + "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/rtree" ) @@ -38,7 +39,7 @@ func newPushDown(ctx context.Context, mgr ClientMgr, cap int) *pushDown { func (push *pushDown) pushBackup( req backup.BackupRequest, stores []*metapb.Store, - updateCh chan<- struct{}, + updateCh glue.Progress, ) (rtree.RangeTree, error) { // Push down backup tasks to all tikv instances. res := rtree.NewRangeTree() @@ -90,7 +91,7 @@ func (push *pushDown) pushBackup( resp.GetStartKey(), resp.GetEndKey(), resp.GetFiles()) // Update progress - updateCh <- struct{}{} + updateCh.Inc() } else { errPb := resp.GetError() switch v := errPb.Detail.(type) { diff --git a/pkg/backup/schema.go b/pkg/backup/schema.go index 18583d094..73a62477d 100644 --- a/pkg/backup/schema.go +++ b/pkg/backup/schema.go @@ -18,6 +18,7 @@ import ( "go.uber.org/zap" "github.com/pingcap/br/pkg/checksum" + "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/summary" "github.com/pingcap/br/pkg/utils" ) @@ -67,7 +68,7 @@ func (pending *Schemas) Start( store kv.Storage, backupTS uint64, concurrency uint, - updateCh chan<- struct{}, + updateCh glue.Progress, ) { workerPool := utils.NewWorkerPool(concurrency, "Schemas") go func() { @@ -82,7 +83,7 @@ func (pending *Schemas) Start( if pending.skipChecksum { pending.backupSchemaCh <- schema - updateCh <- struct{}{} + updateCh.Inc() return } @@ -110,7 +111,7 @@ func (pending *Schemas) Start( zap.Duration("take", time.Since(start))) pending.backupSchemaCh <- schema - updateCh <- struct{}{} + updateCh.Inc() }) } pending.wg.Wait() diff --git a/pkg/backup/schema_test.go b/pkg/backup/schema_test.go index 3b3bef897..98173dd55 100644 --- a/pkg/backup/schema_test.go +++ b/pkg/backup/schema_test.go @@ -5,6 +5,7 @@ package backup import ( "context" "math" + "sync/atomic" . "github.com/pingcap/check" "github.com/pingcap/tidb-tools/pkg/filter" @@ -30,6 +31,24 @@ func (s *testBackupSchemaSuite) TearDownSuite(c *C) { testleak.AfterTest(c)() } +type simpleProgress struct { + counter int64 +} + +func (sp *simpleProgress) Inc() { + atomic.AddInt64(&sp.counter, 1) +} + +func (sp *simpleProgress) Close() {} + +func (sp *simpleProgress) reset() { + atomic.StoreInt64(&sp.counter, 0) +} + +func (sp *simpleProgress) get() int64 { + return atomic.LoadInt64(&sp.counter) +} + func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) { c.Assert(s.mock.Start(), IsNil) defer s.mock.Stop() @@ -43,7 +62,7 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) { c.Assert(err, IsNil) _, backupSchemas, err := BuildBackupRangeAndSchema( s.mock.Domain, s.mock.Storage, testFilter, math.MaxUint64) - c.Assert(err, NotNil) + c.Assert(err, IsNil) c.Assert(backupSchemas, IsNil) // Database is not exist. @@ -53,15 +72,15 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) { c.Assert(err, IsNil) _, backupSchemas, err = BuildBackupRangeAndSchema( s.mock.Domain, s.mock.Storage, fooFilter, math.MaxUint64) - c.Assert(err, NotNil) + c.Assert(err, IsNil) c.Assert(backupSchemas, IsNil) - // Empty databse. + // Empty database. noFilter, err := filter.New(false, &filter.Rules{}) c.Assert(err, IsNil) _, backupSchemas, err = BuildBackupRangeAndSchema( s.mock.Domain, s.mock.Storage, noFilter, math.MaxUint64) - c.Assert(err, NotNil) + c.Assert(err, IsNil) c.Assert(backupSchemas, IsNil) tk.MustExec("use test") @@ -73,10 +92,10 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) { s.mock.Domain, s.mock.Storage, testFilter, math.MaxUint64) c.Assert(err, IsNil) c.Assert(backupSchemas.Len(), Equals, 1) - updateCh := make(chan struct{}, 2) + updateCh := new(simpleProgress) backupSchemas.Start(context.Background(), s.mock.Storage, math.MaxUint64, 1, updateCh) schemas, err := backupSchemas.finishTableChecksum() - <-updateCh + c.Assert(updateCh.get(), Equals, int64(1)) c.Assert(err, IsNil) c.Assert(len(schemas), Equals, 1) // Cluster returns a dummy checksum (all fields are 1). @@ -93,10 +112,10 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) { s.mock.Domain, s.mock.Storage, noFilter, math.MaxUint64) c.Assert(err, IsNil) c.Assert(backupSchemas.Len(), Equals, 2) + updateCh.reset() backupSchemas.Start(context.Background(), s.mock.Storage, math.MaxUint64, 2, updateCh) schemas, err = backupSchemas.finishTableChecksum() - <-updateCh - <-updateCh + c.Assert(updateCh.get(), Equals, int64(2)) c.Assert(err, IsNil) c.Assert(len(schemas), Equals, 2) // Cluster returns a dummy checksum (all fields are 1). diff --git a/pkg/glue/glue.go b/pkg/glue/glue.go index f2f3ff55e..8e5bb8577 100644 --- a/pkg/glue/glue.go +++ b/pkg/glue/glue.go @@ -9,7 +9,6 @@ import ( pd "github.com/pingcap/pd/v4/client" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/meta/autoid" ) // Glue is an abstraction of TiDB function calls used in BR. @@ -21,12 +20,24 @@ type Glue interface { // OwnsStorage returns whether the storage returned by Open() is owned // If this method returns false, the connection manager will never close the storage. OwnsStorage() bool + + StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) Progress } // Session is an abstraction of the session.Session interface. type Session interface { Execute(ctx context.Context, sql string) error - ShowCreateDatabase(schema *model.DBInfo) (string, error) - ShowCreateTable(table *model.TableInfo, allocator autoid.Allocator) (string, error) + CreateDatabase(ctx context.Context, schema *model.DBInfo) error + CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error + Close() +} + +// Progress is an interface recording the current execution progress. +type Progress interface { + // Inc increases the progress. This method must be goroutine-safe, and can + // be called from any goroutine. + Inc() + // Close marks the progress as 100% complete and that Inc() can no longer be + // called. Close() } diff --git a/pkg/gluetidb/glue.go b/pkg/gluetidb/glue.go index 80756d2c2..73ef66e4f 100644 --- a/pkg/gluetidb/glue.go +++ b/pkg/gluetidb/glue.go @@ -3,15 +3,14 @@ package gluetidb import ( - "bytes" "context" "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" pd "github.com/pingcap/pd/v4/client" + "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" - "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/session" "github.com/pingcap/br/pkg/glue" @@ -51,28 +50,40 @@ func (Glue) OwnsStorage() bool { return true } +// StartProgress implements glue.Glue +func (g Glue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress { + return g.tikvGlue.StartProgress(ctx, cmdName, total, redirectLog) +} + // Execute implements glue.Session func (gs *tidbSession) Execute(ctx context.Context, sql string) error { _, err := gs.se.Execute(ctx, sql) return err } -// ShowCreateDatabase implements glue.Session -func (gs *tidbSession) ShowCreateDatabase(schema *model.DBInfo) (string, error) { - var buf bytes.Buffer - if err := executor.ConstructResultOfShowCreateDatabase(gs.se, schema, true, &buf); err != nil { - return "", err +// CreateDatabase implements glue.Session +func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { + d := domain.GetDomain(gs.se).DDL() + schema = schema.Clone() + if len(schema.Charset) == 0 { + schema.Charset = mysql.DefaultCharset } - return buf.String(), nil + return d.CreateSchemaWithInfo(gs.se, schema, ddl.OnExistIgnore, true) } -// ShowCreateTable implements glue.Session -func (gs *tidbSession) ShowCreateTable(table *model.TableInfo, allocator autoid.Allocator) (string, error) { - var buf bytes.Buffer - if err := executor.ConstructResultOfShowCreateTable(gs.se, table, allocator, &buf); err != nil { - return "", err +// CreateTable implements glue.Session +func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error { + d := domain.GetDomain(gs.se).DDL() + + // Clone() does not clone partitions yet :( + table = table.Clone() + if table.Partition != nil { + newPartition := *table.Partition + newPartition.Definitions = append([]model.PartitionDefinition{}, table.Partition.Definitions...) + table.Partition = &newPartition } - return buf.String(), nil + + return d.CreateTableWithInfo(gs.se, dbName, table, ddl.OnExistIgnore, true) } // Close implements glue.Session diff --git a/pkg/gluetikv/glue.go b/pkg/gluetikv/glue.go index e63b35b95..ab3c0d57f 100644 --- a/pkg/gluetikv/glue.go +++ b/pkg/gluetikv/glue.go @@ -3,6 +3,8 @@ package gluetikv import ( + "context" + pd "github.com/pingcap/pd/v4/client" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" @@ -10,6 +12,7 @@ import ( "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/br/pkg/glue" + "github.com/pingcap/br/pkg/utils" ) // Glue is an implementation of glue.Glue that accesses only TiKV without TiDB. @@ -41,3 +44,22 @@ func (Glue) Open(path string, option pd.SecurityOption) (kv.Storage, error) { func (Glue) OwnsStorage() bool { return true } + +// StartProgress implements glue.Glue +func (Glue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress { + return progress{ch: utils.StartProgress(ctx, cmdName, total, redirectLog)} +} + +type progress struct { + ch chan<- struct{} +} + +// Inc implements glue.Progress +func (p progress) Inc() { + p.ch <- struct{}{} +} + +// Close implements glue.Progress +func (p progress) Close() { + close(p.ch) +} diff --git a/pkg/restore/backoff.go b/pkg/restore/backoff.go index 21048dd13..a84014c11 100644 --- a/pkg/restore/backoff.go +++ b/pkg/restore/backoff.go @@ -15,18 +15,11 @@ import ( var ( errEpochNotMatch = errors.NewNoStackError("epoch not match") errKeyNotInRegion = errors.NewNoStackError("key not in region") - errRegionNotFound = errors.NewNoStackError("region not found") - errResp = errors.NewNoStackError("response error") errRewriteRuleNotFound = errors.NewNoStackError("rewrite rule not found") errRangeIsEmpty = errors.NewNoStackError("range is empty") errGrpc = errors.NewNoStackError("gRPC error") - - // TODO: add `error` field to `DownloadResponse` for distinguish the errors of gRPC - // and the errors of request - errBadFormat = errors.NewNoStackError("bad format") - errWrongKeyPrefix = errors.NewNoStackError("wrong key prefix") - errFileCorrupted = errors.NewNoStackError("file corrupted") - errCannotRead = errors.NewNoStackError("cannot read externel storage") + errDownloadFailed = errors.NewNoStackError("download sst failed") + errIngestFailed = errors.NewNoStackError("ingest sst failed") ) const ( @@ -67,7 +60,7 @@ func newDownloadSSTBackoffer() utils.Backoffer { func (bo *importerBackoffer) NextBackoff(err error) time.Duration { switch errors.Cause(err) { - case errResp, errGrpc, errEpochNotMatch: + case errGrpc, errEpochNotMatch, errIngestFailed: bo.delayTime = 2 * bo.delayTime bo.attempt-- case errRangeIsEmpty, errRewriteRuleNotFound: diff --git a/pkg/restore/backoff_test.go b/pkg/restore/backoff_test.go index 11accedd2..a07c0839b 100644 --- a/pkg/restore/backoff_test.go +++ b/pkg/restore/backoff_test.go @@ -37,7 +37,7 @@ func (s *testBackofferSuite) TestImporterBackoffer(c *C) { case 0: return errGrpc case 1: - return errResp + return errEpochNotMatch case 2: return errRangeIsEmpty } @@ -54,8 +54,8 @@ func (s *testBackofferSuite) TestImporterBackoffer(c *C) { } err = utils.WithRetry(context.Background(), func() error { defer func() { counter++ }() - return errResp + return errEpochNotMatch }, &backoffer) c.Assert(counter, Equals, 10) - c.Assert(err, Equals, errResp) + c.Assert(err, Equals, errEpochNotMatch) } diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 2453f2974..fde382fb0 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -64,6 +64,7 @@ type Client struct { db *DB rateLimit uint64 isOnline bool + noSchema bool hasSpeedLimited bool restoreStores []uint64 @@ -305,6 +306,10 @@ func (rc *Client) GetTableSchema( // CreateDatabase creates a database. func (rc *Client) CreateDatabase(db *model.DBInfo) error { + if rc.IsSkipCreateSQL() { + log.Info("skip create database", zap.Stringer("database", db.Name)) + return nil + } return rc.db.CreateDatabase(rc.ctx, db) } @@ -320,9 +325,13 @@ func (rc *Client) CreateTables( } newTables := make([]*model.TableInfo, 0, len(tables)) for _, table := range tables { - err := rc.db.CreateTable(rc.ctx, table) - if err != nil { - return nil, nil, err + if rc.IsSkipCreateSQL() { + log.Info("skip create table and alter autoIncID", zap.Stringer("table", table.Info.Name)) + } else { + err := rc.db.CreateTable(rc.ctx, table) + if err != nil { + return nil, nil, err + } } newTableInfo, err := rc.GetTableSchema(dom, table.Db.Name, table.Info.Name) if err != nil { @@ -338,15 +347,18 @@ func (rc *Client) CreateTables( // RemoveTiFlashReplica removes all the tiflash replicas of a table // TODO: remove this after tiflash supports restore -func (rc *Client) RemoveTiFlashReplica(tables []*utils.Table, placementRules []placement.Rule) error { +func (rc *Client) RemoveTiFlashReplica( + tables []*utils.Table, newTables []*model.TableInfo, placementRules []placement.Rule) error { schemas := make([]*backup.Schema, 0, len(tables)) var updateReplica bool - for _, table := range tables { - if rule := utils.SearchPlacementRule(table.Info.ID, placementRules, placement.Learner); rule != nil { + // must use new table id to search placement rules + // here newTables and tables must have same order + for i, table := range tables { + if rule := utils.SearchPlacementRule(newTables[i].ID, placementRules, placement.Learner); rule != nil { table.TiFlashReplicas = rule.Count updateReplica = true } - tableData, err := json.Marshal(table.Info) + tableData, err := json.Marshal(newTables[i]) if err != nil { return errors.Trace(err) } @@ -445,7 +457,8 @@ func (rc *Client) setSpeedLimit() error { func (rc *Client) RestoreFiles( files []*backup.File, rewriteRules *RewriteRules, - updateCh chan<- struct{}, + rejectStoreMap map[uint64]bool, + updateCh glue.Progress, ) (err error) { start := time.Now() defer func() { @@ -476,9 +489,9 @@ func (rc *Client) RestoreFiles( defer wg.Done() select { case <-rc.ctx.Done(): - errCh <- nil - case errCh <- rc.fileImporter.Import(fileReplica, rewriteRules): - updateCh <- struct{}{} + errCh <- rc.ctx.Err() + case errCh <- rc.fileImporter.Import(fileReplica, rejectStoreMap, rewriteRules): + updateCh.Inc() } }) } @@ -499,7 +512,7 @@ func (rc *Client) RestoreFiles( } // RestoreRaw tries to restore raw keys in the specified range. -func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.File, updateCh chan<- struct{}) error { +func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.File, updateCh glue.Progress) error { start := time.Now() defer func() { elapsed := time.Since(start) @@ -527,9 +540,9 @@ func (rc *Client) RestoreRaw(startKey []byte, endKey []byte, files []*backup.Fil defer wg.Done() select { case <-rc.ctx.Done(): - errCh <- nil - case errCh <- rc.fileImporter.Import(fileReplica, emptyRules): - updateCh <- struct{}{} + errCh <- rc.ctx.Err() + case errCh <- rc.fileImporter.Import(fileReplica, nil, emptyRules): + updateCh.Inc() } }) } @@ -617,7 +630,7 @@ func (rc *Client) ValidateChecksum( kvClient kv.Client, tables []*utils.Table, newTables []*model.TableInfo, - updateCh chan<- struct{}, + updateCh glue.Progress, ) error { start := time.Now() defer func() { @@ -674,7 +687,7 @@ func (rc *Client) ValidateChecksum( return } - updateCh <- struct{}{} + updateCh.Inc() }) } wg.Wait() @@ -847,3 +860,13 @@ func (rc *Client) IsIncremental() bool { return !(rc.backupMeta.StartVersion == rc.backupMeta.EndVersion || rc.backupMeta.StartVersion == 0) } + +// EnableSkipCreateSQL sets switch of skip create schema and tables +func (rc *Client) EnableSkipCreateSQL() { + rc.noSchema = true +} + +// IsSkipCreateSQL returns whether we need skip create schema and tables in restore +func (rc *Client) IsSkipCreateSQL() bool { + return rc.noSchema +} diff --git a/pkg/restore/client_test.go b/pkg/restore/client_test.go index 3f8cb71f8..13b5caa0a 100644 --- a/pkg/restore/client_test.go +++ b/pkg/restore/client_test.go @@ -72,6 +72,10 @@ func (s *testRestoreClientSuite) TestCreateTables(c *C) { } rules, newTables, err := client.CreateTables(s.mock.Domain, tables, 0) c.Assert(err, IsNil) + // make sure tables and newTables have same order + for i, t := range tables { + c.Assert(newTables[i].Name, Equals, t.Info.Name) + } for _, nt := range newTables { c.Assert(nt.Name.String(), Matches, "test[0-3]") } diff --git a/pkg/restore/db.go b/pkg/restore/db.go index be24a1ad9..6197ff7a2 100644 --- a/pkg/restore/db.go +++ b/pkg/restore/db.go @@ -6,7 +6,6 @@ import ( "context" "fmt" "sort" - "strings" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -70,57 +69,28 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { // CreateDatabase executes a CREATE DATABASE SQL. func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { - createSQL, err := db.se.ShowCreateDatabase(schema) + err := db.se.CreateDatabase(ctx, schema) if err != nil { - log.Error("build create database SQL failed", zap.Stringer("db", schema.Name), zap.Error(err)) - return errors.Trace(err) - } - err = db.se.Execute(ctx, createSQL) - if err != nil { - log.Error("create database failed", zap.String("query", createSQL), zap.Error(err)) + log.Error("create database failed", zap.Stringer("db", schema.Name), zap.Error(err)) } return errors.Trace(err) } // CreateTable executes a CREATE TABLE SQL. func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error { - tableInfo := table.Info - createSQL, err := db.se.ShowCreateTable(tableInfo, newIDAllocator(tableInfo.AutoIncID)) - if err != nil { - log.Error( - "build create table SQL failed", - zap.Stringer("db", table.Db.Name), - zap.Stringer("table", tableInfo.Name), - zap.Error(err)) - return errors.Trace(err) - } - switchDbSQL := fmt.Sprintf("use %s;", utils.EncloseName(table.Db.Name.O)) - err = db.se.Execute(ctx, switchDbSQL) - if err != nil { - log.Error("switch db failed", - zap.String("SQL", switchDbSQL), - zap.Stringer("db", table.Db.Name), - zap.Error(err)) - return errors.Trace(err) - } - // Insert `IF NOT EXISTS` statement to skip the created tables - words := strings.SplitN(createSQL, " ", 3) - if len(words) > 2 && strings.ToUpper(words[0]) == "CREATE" && strings.ToUpper(words[1]) == "TABLE" { - createSQL = "CREATE TABLE IF NOT EXISTS " + words[2] - } - err = db.se.Execute(ctx, createSQL) + err := db.se.CreateTable(ctx, table.Db.Name, table.Info) if err != nil { log.Error("create table failed", - zap.String("SQL", createSQL), zap.Stringer("db", table.Db.Name), zap.Stringer("table", table.Info.Name), zap.Error(err)) return errors.Trace(err) } alterAutoIncIDSQL := fmt.Sprintf( - "alter table %s auto_increment = %d", - utils.EncloseName(tableInfo.Name.O), - tableInfo.AutoIncID) + "alter table %s.%s auto_increment = %d", + utils.EncloseName(table.Db.Name.O), + utils.EncloseName(table.Info.Name.O), + table.Info.AutoIncID) err = db.se.Execute(ctx, alterAutoIncIDSQL) if err != nil { log.Error("alter AutoIncID failed", @@ -203,19 +173,26 @@ func FilterDDLJobs(allDDLJobs []*model.Job, tables []*utils.Table) (ddlJobs []*m } } + type namePair struct { + db string + table string + } + for _, table := range tables { tableIDs := make(map[int64]bool) tableIDs[table.Info.ID] = true - tableNames := make(map[string]bool) - tableNames[table.Info.Name.String()] = true + tableNames := make(map[namePair]bool) + name := namePair{table.Db.Name.String(), table.Info.Name.String()} + tableNames[name] = true for _, job := range allDDLJobs { if job.BinlogInfo.TableInfo != nil { - if tableIDs[job.TableID] || tableNames[job.BinlogInfo.TableInfo.Name.String()] { + name := namePair{job.SchemaName, job.BinlogInfo.TableInfo.Name.String()} + if tableIDs[job.TableID] || tableNames[name] { ddlJobs = append(ddlJobs, job) tableIDs[job.TableID] = true // For truncate table, the id may be changed tableIDs[job.BinlogInfo.TableInfo.ID] = true - tableNames[job.BinlogInfo.TableInfo.Name.String()] = true + tableNames[name] = true } } } diff --git a/pkg/restore/import.go b/pkg/restore/import.go index fec07a870..ee5cef6ca 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -6,7 +6,6 @@ import ( "bytes" "context" "crypto/tls" - "strings" "sync" "time" @@ -176,7 +175,11 @@ func (importer *FileImporter) SetRawRange(startKey, endKey []byte) error { // Import tries to import a file. // All rules must contain encoded keys. -func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRules) error { +func (importer *FileImporter) Import( + file *backup.File, + rejectStoreMap map[uint64]bool, + rewriteRules *RewriteRules, +) error { log.Debug("import file", zap.Stringer("file", file)) // Rewrite the start key and end key of file to scan regions var startKey, endKey []byte @@ -194,6 +197,9 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul zap.Stringer("file", file), zap.Binary("startKey", startKey), zap.Binary("endKey", endKey)) + + needReject := len(rejectStoreMap) > 0 + err = utils.WithRetry(importer.ctx, func() error { ctx, cancel := context.WithTimeout(importer.ctx, importScanRegionTime) defer cancel() @@ -203,6 +209,23 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul if errScanRegion != nil { return errors.Trace(errScanRegion) } + + if needReject { + // TODO remove when TiFlash support restore + startTime := time.Now() + log.Info("start to wait for removing rejected stores", zap.Reflect("rejectStores", rejectStoreMap)) + for _, region := range regionInfos { + if !waitForRemoveRejectStores(ctx, importer.metaClient, region, rejectStoreMap) { + log.Error("waiting for removing rejected stores failed", + zap.Stringer("region", region.Region)) + return errors.New("waiting for removing rejected stores failed") + } + } + log.Info("waiting for removing rejected stores done", + zap.Int("regions", len(regionInfos)), zap.Duration("take", time.Since(startTime))) + needReject = false + } + log.Debug("scan regions", zap.Stringer("file", file), zap.Int("count", len(regionInfos))) // Try to download and ingest the file in every region for _, regionInfo := range regionInfos { @@ -272,14 +295,12 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul // 2. retry ingest errIngest = errors.AddStack(errEpochNotMatch) break ingestRetry - case errPb.RegionNotFound != nil: - errIngest = errors.AddStack(errRegionNotFound) - break ingestRetry case errPb.KeyNotInRegion != nil: errIngest = errors.AddStack(errKeyNotInRegion) break ingestRetry default: - errIngest = errors.Errorf("ingest error %s", errPb) + // Other errors like `ServerIsBusy`, `RegionNotFound`, etc. should be retryable + errIngest = errors.Annotatef(errIngestFailed, "ingest error %s", errPb) break ingestRetry } } @@ -346,7 +367,10 @@ func (importer *FileImporter) downloadSST( for _, peer := range regionInfo.Region.GetPeers() { resp, err = importer.importClient.DownloadSST(importer.ctx, peer.GetStoreId(), req) if err != nil { - return nil, extractDownloadSSTError(err) + return nil, errors.Annotatef(errGrpc, "%s", err) + } + if resp.GetError() != nil { + return nil, errors.Annotate(errDownloadFailed, resp.GetError().GetMessage()) } if resp.GetIsEmpty() { return nil, errors.Trace(errRangeIsEmpty) @@ -395,7 +419,10 @@ func (importer *FileImporter) downloadRawKVSST( for _, peer := range regionInfo.Region.GetPeers() { resp, err = importer.importClient.DownloadSST(importer.ctx, peer.GetStoreId(), req) if err != nil { - return nil, extractDownloadSSTError(err) + return nil, errors.Annotatef(errGrpc, "%s", err) + } + if resp.GetError() != nil { + return nil, errors.Annotate(errDownloadFailed, resp.GetError().GetMessage()) } if resp.GetIsEmpty() { return nil, errors.Trace(errRangeIsEmpty) @@ -439,18 +466,3 @@ func checkRegionEpoch(new, old *RegionInfo) bool { } return false } - -func extractDownloadSSTError(e error) error { - err := errGrpc - switch { - case strings.Contains(e.Error(), "bad format"): - err = errBadFormat - case strings.Contains(e.Error(), "wrong prefix"): - err = errWrongKeyPrefix - case strings.Contains(e.Error(), "corrupted"): - err = errFileCorrupted - case strings.Contains(e.Error(), "Cannot read"): - err = errCannotRead - } - return errors.Annotatef(err, "%s", e) -} diff --git a/pkg/restore/split.go b/pkg/restore/split.go index 03153097a..4138d0012 100644 --- a/pkg/restore/split.go +++ b/pkg/restore/split.go @@ -63,7 +63,6 @@ func (rs *RegionSplitter) Split( ctx context.Context, ranges []rtree.Range, rewriteRules *RewriteRules, - rejectStores map[uint64]bool, onSplit OnSplitFunc, ) error { if len(ranges) == 0 { @@ -95,14 +94,12 @@ func (rs *RegionSplitter) Split( } interval := SplitRetryInterval scatterRegions := make([]*RegionInfo, 0) - allRegions := make([]*RegionInfo, 0) SplitRegions: for i := 0; i < SplitRetryTimes; i++ { regions, errScan := paginateScanRegion(ctx, rs.client, minKey, maxKey, scanRegionPaginationLimit) if errScan != nil { return errors.Trace(errScan) } - allRegions = append(allRegions, regions...) if len(regions) == 0 { log.Warn("cannot scan any region") return nil @@ -145,19 +142,6 @@ SplitRegions: if errSplit != nil { return errors.Trace(errSplit) } - if len(rejectStores) > 0 { - startTime = time.Now() - log.Info("start to wait for removing rejected stores", zap.Reflect("rejectStores", rejectStores)) - for _, region := range allRegions { - if !rs.waitForRemoveRejectStores(ctx, region, rejectStores) { - log.Error("waiting for removing rejected stores failed", - zap.Stringer("region", region.Region)) - return errors.New("waiting for removing rejected stores failed") - } - } - log.Info("waiting for removing rejected stores done", - zap.Int("regions", len(allRegions)), zap.Duration("take", time.Since(startTime))) - } log.Info("start to wait for scattering regions", zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime))) startTime = time.Now() @@ -211,30 +195,6 @@ func (rs *RegionSplitter) isScatterRegionFinished(ctx context.Context, regionID return ok, nil } -func (rs *RegionSplitter) hasRejectStorePeer( - ctx context.Context, - regionID uint64, - rejectStores map[uint64]bool, -) (bool, error) { - regionInfo, err := rs.client.GetRegionByID(ctx, regionID) - if err != nil { - return false, err - } - if regionInfo == nil { - return false, nil - } - for _, peer := range regionInfo.Region.GetPeers() { - if rejectStores[peer.GetStoreId()] { - return true, nil - } - } - retryTimes := ctx.Value(retryTimes).(int) - if retryTimes > 10 { - log.Warn("get region info", zap.Stringer("region", regionInfo.Region)) - } - return false, nil -} - func (rs *RegionSplitter) waitForSplit(ctx context.Context, regionID uint64) { interval := SplitCheckInterval for i := 0; i < SplitCheckMaxRetryTimes; i++ { @@ -280,36 +240,6 @@ func (rs *RegionSplitter) waitForScatterRegion(ctx context.Context, regionInfo * } } -func (rs *RegionSplitter) waitForRemoveRejectStores( - ctx context.Context, - regionInfo *RegionInfo, - rejectStores map[uint64]bool, -) bool { - interval := RejectStoreCheckInterval - regionID := regionInfo.Region.GetId() - for i := 0; i < RejectStoreCheckRetryTimes; i++ { - ctx1 := context.WithValue(ctx, retryTimes, i) - ok, err := rs.hasRejectStorePeer(ctx1, regionID, rejectStores) - if err != nil { - log.Warn("wait for rejecting store failed", - zap.Stringer("region", regionInfo.Region), - zap.Error(err)) - return false - } - // Do not have any peer in the rejected store, return true - if !ok { - return true - } - interval = 2 * interval - if interval > RejectStoreMaxCheckInterval { - interval = RejectStoreMaxCheckInterval - } - time.Sleep(interval) - } - - return false -} - func (rs *RegionSplitter) splitAndScatterRegions( ctx context.Context, regionInfo *RegionInfo, keys [][]byte, ) ([]*RegionInfo, error) { diff --git a/pkg/restore/split_test.go b/pkg/restore/split_test.go index 06dab1cf1..b21cbf781 100644 --- a/pkg/restore/split_test.go +++ b/pkg/restore/split_test.go @@ -193,7 +193,7 @@ func (s *testRestoreUtilSuite) TestSplit(c *C) { regionSplitter := NewRegionSplitter(client) ctx := context.Background() - err := regionSplitter.Split(ctx, ranges, rewriteRules, map[uint64]bool{}, func(key [][]byte) {}) + err := regionSplitter.Split(ctx, ranges, rewriteRules, func(key [][]byte) {}) if err != nil { c.Assert(err, IsNil, Commentf("split regions failed: %v", err)) } diff --git a/pkg/restore/util.go b/pkg/restore/util.go index c49c07994..2652b1e7b 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -16,56 +16,17 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/pingcap/parser/model" - "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" "go.uber.org/zap" - "github.com/pingcap/br/pkg/conn" + "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/summary" ) var recordPrefixSep = []byte("_r") -// idAllocator always returns a specified ID -type idAllocator struct { - id int64 -} - -func newIDAllocator(id int64) *idAllocator { - return &idAllocator{id: id} -} - -func (alloc *idAllocator) Alloc(tableID int64, n uint64, increment, offset int64) (min int64, max int64, err error) { - return alloc.id, alloc.id, nil -} - -func (alloc *idAllocator) AllocSeqCache(sequenceID int64) (min int64, max int64, round int64, err error) { - // TODO fix this function after support backup sequence - return 0, 0, 0, nil -} - -func (alloc *idAllocator) Rebase(tableID, newBase int64, allocIDs bool) error { - return nil -} - -func (alloc *idAllocator) Base() int64 { - return alloc.id -} - -func (alloc *idAllocator) End() int64 { - return alloc.id -} - -func (alloc *idAllocator) NextGlobalAutoID(tableID int64) (int64, error) { - return alloc.id, nil -} - -func (alloc *idAllocator) GetType() autoid.AllocatorType { - return autoid.RowIDAllocType -} - // GetRewriteRules returns the rewrite rule of the new table and the old table. func GetRewriteRules( newTable *model.TableInfo, @@ -309,6 +270,9 @@ func matchNewPrefix(key []byte, rewriteRules *RewriteRules) *import_sstpb.Rewrit } func truncateTS(key []byte) []byte { + if len(key) == 0 { + return nil + } return key[:len(key)-8] } @@ -320,7 +284,7 @@ func SplitRanges( client *Client, ranges []rtree.Range, rewriteRules *RewriteRules, - updateCh chan<- struct{}, + updateCh glue.Progress, ) error { start := time.Now() defer func() { @@ -328,18 +292,10 @@ func SplitRanges( summary.CollectDuration("split region", elapsed) }() splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig())) - tiflashStores, err := conn.GetAllTiKVStores(ctx, client.GetPDClient(), conn.TiFlashOnly) - if err != nil { - return errors.Trace(err) - } - storeMap := make(map[uint64]bool) - for _, store := range tiflashStores { - storeMap[store.GetId()] = true - } - return splitter.Split(ctx, ranges, rewriteRules, storeMap, func(keys [][]byte) { + return splitter.Split(ctx, ranges, rewriteRules, func(keys [][]byte) { for range keys { - updateCh <- struct{}{} + updateCh.Inc() } }) } @@ -412,3 +368,59 @@ func paginateScanRegion( } return regions, nil } + +func hasRejectStorePeer( + ctx context.Context, + client SplitClient, + regionID uint64, + rejectStores map[uint64]bool, +) (bool, error) { + regionInfo, err := client.GetRegionByID(ctx, regionID) + if err != nil { + return false, err + } + if regionInfo == nil { + return false, nil + } + for _, peer := range regionInfo.Region.GetPeers() { + if rejectStores[peer.GetStoreId()] { + return true, nil + } + } + retryTimes := ctx.Value(retryTimes).(int) + if retryTimes > 10 { + log.Warn("get region info", zap.Stringer("region", regionInfo.Region)) + } + return false, nil +} + +func waitForRemoveRejectStores( + ctx context.Context, + client SplitClient, + regionInfo *RegionInfo, + rejectStores map[uint64]bool, +) bool { + interval := RejectStoreCheckInterval + regionID := regionInfo.Region.GetId() + for i := 0; i < RejectStoreCheckRetryTimes; i++ { + ctx1 := context.WithValue(ctx, retryTimes, i) + ok, err := hasRejectStorePeer(ctx1, client, regionID, rejectStores) + if err != nil { + log.Warn("wait for rejecting store failed", + zap.Stringer("region", regionInfo.Region), + zap.Error(err)) + return false + } + // Do not have any peer in the rejected store, return true + if !ok { + return true + } + interval = 2 * interval + if interval > RejectStoreMaxCheckInterval { + interval = RejectStoreMaxCheckInterval + } + time.Sleep(interval) + } + + return false +} diff --git a/pkg/summary/collector.go b/pkg/summary/collector.go index ee465d60b..76dd8a121 100644 --- a/pkg/summary/collector.go +++ b/pkg/summary/collector.go @@ -35,12 +35,33 @@ type LogCollector interface { CollectInt(name string, t int) + SetSuccessStatus(success bool) + Summary(name string) } type logFunc func(msg string, fields ...zap.Field) -var collector = newLogCollector(log.Info) +var collector LogCollector = newLogCollector(log.Info) + +// InitCollector initilize global collector instance. +func InitCollector( // revive:disable-line:flag-parameter + hasLogFile bool, +) { + logF := log.L().Info + if hasLogFile { + conf := new(log.Config) + // Always duplicate summary to stdout. + logger, _, err := log.InitLogger(conf) + if err == nil { + logF = func(msg string, fields ...zap.Field) { + logger.Info(msg, fields...) + log.Info(msg, fields...) + } + } + } + collector = newLogCollector(logF) +} type logCollector struct { mu sync.Mutex @@ -52,6 +73,7 @@ type logCollector struct { failureReasons map[string]error durations map[string]time.Duration ints map[string]int + successStatus bool log logFunc } @@ -117,6 +139,12 @@ func (tc *logCollector) CollectInt(name string, t int) { tc.ints[name] += t } +func (tc *logCollector) SetSuccessStatus(success bool) { + tc.mu.Lock() + defer tc.mu.Unlock() + tc.successStatus = success +} + func (tc *logCollector) Summary(name string) { tc.mu.Lock() defer func() { @@ -145,7 +173,7 @@ func (tc *logCollector) Summary(name string) { logFields = append(logFields, zap.Int(key, val)) } - if len(tc.failureReasons) != 0 { + if len(tc.failureReasons) != 0 || !tc.successStatus { for unitName, reason := range tc.failureReasons { logFields = append(logFields, zap.String("unitName", unitName), zap.Error(reason)) } diff --git a/pkg/summary/collector_test.go b/pkg/summary/collector_test.go index 7dff32dd1..165232f55 100644 --- a/pkg/summary/collector_test.go +++ b/pkg/summary/collector_test.go @@ -30,6 +30,7 @@ func (suit *testCollectorSuite) TestSumDurationInt(c *C) { col.CollectDuration("b", time.Second) col.CollectInt("c", 2) col.CollectInt("c", 2) + col.SetSuccessStatus(true) col.Summary("foo") c.Assert(len(fields), Equals, 3) diff --git a/pkg/summary/summary.go b/pkg/summary/summary.go index 3ffdedf8a..852e936a9 100644 --- a/pkg/summary/summary.go +++ b/pkg/summary/summary.go @@ -29,6 +29,11 @@ func CollectInt(name string, t int) { collector.CollectInt(name, t) } +// SetSuccessStatus sets final success status +func SetSuccessStatus(success bool) { + collector.SetSuccessStatus(success) +} + // Summary outputs summary log func Summary(name string) { collector.Summary(name) diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 5944a22a0..040be0444 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -24,7 +24,6 @@ import ( "github.com/pingcap/br/pkg/glue" "github.com/pingcap/br/pkg/storage" "github.com/pingcap/br/pkg/summary" - "github.com/pingcap/br/pkg/utils" ) const ( @@ -127,6 +126,10 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig if err != nil { return err } + // nothing to backup + if ranges == nil { + return nil + } ddlJobs := make([]*model.Job, 0) if cfg.LastBackupTS > 0 { @@ -160,7 +163,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig // Backup // Redirect to log if there is no log file to avoid unreadable output. - updateCh := utils.StartProgress( + updateCh := g.StartProgress( ctx, cmdName, int64(approximateRegions), !cfg.LogProgress) req := kvproto.BackupRequest{ @@ -175,14 +178,14 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig return err } // Backup has finished - close(updateCh) + updateCh.Close() // Checksum backupSchemasConcurrency := backup.DefaultSchemaConcurrency if backupSchemas.Len() < backupSchemasConcurrency { backupSchemasConcurrency = backupSchemas.Len() } - updateCh = utils.StartProgress( + updateCh = g.StartProgress( ctx, "Checksum", int64(backupSchemas.Len()), !cfg.LogProgress) backupSchemas.SetSkipChecksum(!cfg.Checksum) backupSchemas.Start( @@ -209,12 +212,15 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig log.Info("Skip fast checksum in incremental backup") } // Checksum has finished - close(updateCh) + updateCh.Close() err = client.SaveBackupMeta(ctx, ddlJobs) if err != nil { return err } + + // Set task summary to success status. + summary.SetSuccessStatus(true) return nil } diff --git a/pkg/task/backup_raw.go b/pkg/task/backup_raw.go index d9deaccba..fefcc2cf1 100644 --- a/pkg/task/backup_raw.go +++ b/pkg/task/backup_raw.go @@ -117,7 +117,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf // Backup // Redirect to log if there is no log file to avoid unreadable output. - updateCh := utils.StartProgress( + updateCh := g.StartProgress( ctx, cmdName, int64(approximateRegions), !cfg.LogProgress) req := kvproto.BackupRequest{ @@ -134,12 +134,15 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf return err } // Backup has finished - close(updateCh) + updateCh.Close() // Checksum err = client.SaveBackupMeta(ctx, nil) if err != nil { return err } + + // Set task summary to success status. + summary.SetSuccessStatus(true) return nil } diff --git a/pkg/task/common.go b/pkg/task/common.go index 61186abe1..e3b48748e 100644 --- a/pkg/task/common.go +++ b/pkg/task/common.go @@ -95,7 +95,7 @@ type Config struct { // DefineCommonFlags defines the flags common to all BRIE commands. func DefineCommonFlags(flags *pflag.FlagSet) { flags.BoolP(flagSendCreds, "c", true, "Whether send credentials to tikv") - flags.StringP(flagStorage, "s", "", `specify the url where backup storage, eg, "s3:///path/to/save"`) + flags.StringP(flagStorage, "s", "", `specify the url where backup storage, eg, "s3://bucket/path/prefix"`) flags.StringSliceP(flagPD, "u", []string{"127.0.0.1:2379"}, "PD address") flags.String(flagCA, "", "CA certificate path for TLS connection") flags.String(flagCert, "", "Certificate path for TLS connection") diff --git a/pkg/task/restore.go b/pkg/task/restore.go index 7d5dd6846..9dce5139e 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -10,6 +10,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/parser/model" "github.com/pingcap/tidb-tools/pkg/filter" + "github.com/pingcap/tidb/config" "github.com/spf13/pflag" "go.uber.org/zap" @@ -23,7 +24,8 @@ import ( ) const ( - flagOnline = "online" + flagOnline = "online" + flagNoSchema = "no-schema" ) var schedulers = map[string]struct{}{ @@ -45,13 +47,18 @@ const ( type RestoreConfig struct { Config - Online bool `json:"online" toml:"online"` + Online bool `json:"online" toml:"online"` + NoSchema bool `json:"no-schema" toml:"no-schema"` } // DefineRestoreFlags defines common flags for the restore command. func DefineRestoreFlags(flags *pflag.FlagSet) { // TODO remove experimental tag if it's stable - flags.Bool("online", false, "(experimental) Whether online when restore") + flags.Bool(flagOnline, false, "(experimental) Whether online when restore") + flags.Bool(flagNoSchema, false, "skip creating schemas and tables, reuse existing empty ones") + + // Do not expose this flag + _ = flags.MarkHidden(flagNoSchema) } // ParseFromFlags parses the restore-related flags from the flag set. @@ -61,6 +68,10 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Trace(err) } + cfg.NoSchema, err = flags.GetBool(flagNoSchema) + if err != nil { + return errors.Trace(err) + } err = cfg.Config.ParseFromFlags(flags) if err != nil { return errors.Trace(err) @@ -101,6 +112,9 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf if cfg.Online { client.EnableOnline() } + if cfg.NoSchema { + client.EnableSkipCreateSQL() + } err = client.LoadRestoreStores(ctx) if err != nil { return err @@ -118,13 +132,10 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf return errors.New("cannot do transactional restore from raw kv data") } - files, tables, err := filterRestoreFiles(client, cfg) + files, tables, dbs, err := filterRestoreFiles(client, cfg) if err != nil { return err } - if len(files) == 0 { - return errors.New("all files are filtered out from the backup archive, nothing to restore") - } var newTS uint64 if client.IsIncremental() { @@ -137,10 +148,34 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf if err != nil { return err } + // execute DDL first + + // set max-index-length before execute DDLs and create tables + // we set this value to max(3072*4), otherwise we might not restore table + // when upstream and downstream both set this value greater than default(3072) + conf := config.GetGlobalConfig() + conf.MaxIndexLength = config.DefMaxOfMaxIndexLength + config.StoreGlobalConfig(conf) + log.Warn("set max-index-length to max(3072*4) to skip check index length in DDL") + err = client.ExecDDLs(ddlJobs) if err != nil { return errors.Trace(err) } + + // nothing to restore, maybe only ddl changes in incremental restore + if len(files) == 0 { + log.Info("all files are filtered out from the backup archive, nothing to restore") + return nil + } + + for _, db := range dbs { + err = client.CreateDatabase(db.Info) + if err != nil { + return err + } + } + rewriteRules, newTables, err := client.CreateTables(mgr.GetDomain(), tables, newTS) if err != nil { return err @@ -149,7 +184,8 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf if err != nil { return err } - err = client.RemoveTiFlashReplica(tables, placementRules) + + err = client.RemoveTiFlashReplica(tables, newTables, placementRules) if err != nil { return err } @@ -171,7 +207,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf ranges = restore.AttachFilesToRanges(files, ranges) // Redirect to log if there is no log file to avoid unreadable output. - updateCh := utils.StartProgress( + updateCh := g.StartProgress( ctx, cmdName, // Split/Scatter + Download/Ingest @@ -197,6 +233,16 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf if batchSize > maxRestoreBatchSizeLimit { batchSize = maxRestoreBatchSizeLimit // 256 } + + tiflashStores, err := conn.GetAllTiKVStores(ctx, client.GetPDClient(), conn.TiFlashOnly) + if err != nil { + return errors.Trace(err) + } + rejectStoreMap := make(map[uint64]bool) + for _, store := range tiflashStores { + rejectStoreMap[store.GetId()] = true + } + for { if len(ranges) == 0 { break @@ -221,7 +267,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } // After split, we can restore backup files. - err = client.RestoreFiles(fileBatch, rewriteRules, updateCh) + err = client.RestoreFiles(fileBatch, rewriteRules, rejectStoreMap, updateCh) if err != nil { break } @@ -229,38 +275,44 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf // Always run the post-work even on error, so we don't stuck in the import // mode or emptied schedulers - err = restorePostWork(ctx, client, mgr, clusterCfg) - if err != nil { - return err + if errRestorePostWork := restorePostWork(ctx, client, mgr, clusterCfg); err == nil { + err = errRestorePostWork + } + + if errSplitPostWork := splitPostWork(ctx, client, newTables); err == nil { + err = errSplitPostWork } - if err = splitPostWork(ctx, client, newTables); err != nil { + // If any error happened, return now, don't execute checksum. + if err != nil { return err } // Restore has finished. - close(updateCh) + updateCh.Close() // Checksum - updateCh = utils.StartProgress( + updateCh = g.StartProgress( ctx, "Checksum", int64(len(newTables)), !cfg.LogProgress) err = client.ValidateChecksum( ctx, mgr.GetTiKV().GetClient(), tables, newTables, updateCh) if err != nil { return err } - close(updateCh) + updateCh.Close() + // Set task summary to success status. + summary.SetSuccessStatus(true) return nil } func filterRestoreFiles( client *restore.Client, cfg *RestoreConfig, -) (files []*backup.File, tables []*utils.Table, err error) { +) (files []*backup.File, tables []*utils.Table, dbs []*utils.Database, err error) { tableFilter, err := filter.New(cfg.CaseSensitive, &cfg.Filter) if err != nil { - return nil, nil, err + return nil, nil, nil, err } for _, db := range client.GetDatabases() { @@ -271,17 +323,13 @@ func filterRestoreFiles( } if !createdDatabase { - if err = client.CreateDatabase(db.Info); err != nil { - return nil, nil, err - } + dbs = append(dbs, db) createdDatabase = true } - files = append(files, table.Files...) tables = append(tables, table) } } - return } @@ -399,7 +447,7 @@ func RunRestoreTiflashReplica(c context.Context, g glue.Glue, cmdName string, cf for _, db := range dbs { tables = append(tables, db.Tables...) } - updateCh := utils.StartProgress( + updateCh := g.StartProgress( ctx, "RecoverTiflashReplica", int64(len(tables)), !cfg.LogProgress) for _, t := range tables { log.Info("get table", zap.Stringer("name", t.Info.Name), @@ -409,10 +457,13 @@ func RunRestoreTiflashReplica(c context.Context, g glue.Glue, cmdName string, cf if err != nil { return err } - updateCh <- struct{}{} + updateCh.Inc() } } + updateCh.Close() summary.CollectInt("recover tables", len(tables)) + // Set task summary to success status. + summary.SetSuccessStatus(true) return nil } diff --git a/pkg/task/restore_raw.go b/pkg/task/restore_raw.go index 308a44b4e..03e987456 100644 --- a/pkg/task/restore_raw.go +++ b/pkg/task/restore_raw.go @@ -46,7 +46,7 @@ func (cfg *RestoreRawConfig) ParseFromFlags(flags *pflag.FlagSet) error { } // RunRestoreRaw starts a raw kv restore task inside the current goroutine. -func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreRawConfig) error { +func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreRawConfig) (err error) { defer summary.Summary(cmdName) ctx, cancel := context.WithCancel(c) defer cancel() @@ -97,7 +97,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR // Redirect to log if there is no log file to avoid unreadable output. // TODO: How to show progress? - updateCh := utils.StartProgress( + updateCh := g.StartProgress( ctx, "Raw Restore", // Split/Scatter + Download/Ingest @@ -113,18 +113,22 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR if err != nil { return errors.Trace(err) } + defer func() { + errPostWork := restorePostWork(ctx, client, mgr, removedSchedulers) + if err == nil { + err = errPostWork + } + }() err = client.RestoreRaw(cfg.StartKey, cfg.EndKey, files, updateCh) if err != nil { return errors.Trace(err) } - err = restorePostWork(ctx, client, mgr, removedSchedulers) - if err != nil { - return errors.Trace(err) - } // Restore has finished. - close(updateCh) + updateCh.Close() + // Set task summary to success status. + summary.SetSuccessStatus(true) return nil } diff --git a/tests/README.md b/tests/README.md index 814241b4a..fbb018505 100644 --- a/tests/README.md +++ b/tests/README.md @@ -11,6 +11,7 @@ programs. * `bin/pd-server` * `bin/pd-ctl` * `bin/go-ycsb` + * `bin/minio` The versions must be ≥2.1.0 as usual. @@ -33,7 +34,7 @@ Run `make integration_test` to execute the integration tests. This command will 2. Check that all 6 required executables and `br` executable exist 3. Execute `tests/run.sh` -If the first tow steps are done before, you could also run `tests/run.sh` directly. +If the first two steps are done before, you could also run `tests/run.sh` directly. This script will 1. Start PD, TiKV and TiDB in background with local storage diff --git a/tests/_utils/run_services b/tests/_utils/run_services index 07fe1a2ad..f31152932 100644 --- a/tests/_utils/run_services +++ b/tests/_utils/run_services @@ -38,6 +38,9 @@ stop_services() { start_services() { stop_services + TIDB_CONFIG="${1-tests}/config/tidb.toml" + TIKV_CONFIG="${1-tests}/config/tikv.toml" + echo "Starting PD..." mkdir -p "$TEST_DIR/pd" bin/pd-server \ @@ -63,7 +66,7 @@ start_services() { -A "$TIKV_ADDR$i" \ --status-addr "$TIKV_STATUS_ADDR$i" \ --log-file "$TEST_DIR/tikv${i}.log" \ - -C "tests/config/tikv.toml" \ + -C "$TIKV_CONFIG" \ -s "$TEST_DIR/tikv${i}" & done @@ -83,7 +86,7 @@ start_services() { --status 10080 \ --store tikv \ --path "$PD_ADDR" \ - --config "tests/config/tidb.toml" \ + --config "$TIDB_CONFIG" \ --log-file "$TEST_DIR/tidb.log" & echo "Verifying TiDB is started..." diff --git a/tests/br_alter_pk_server/config/tidb.toml b/tests/br_alter_pk_server/config/tidb.toml new file mode 100644 index 000000000..30b7d4869 --- /dev/null +++ b/tests/br_alter_pk_server/config/tidb.toml @@ -0,0 +1,8 @@ +# config of tidb + +# Schema lease duration +# There are lot of ddl in the tests, setting this +# to 360s to test whther BR is gracefully shutdown. +lease = "360s" + +alter-primary-key = true diff --git a/tests/br_alter_pk_server/config/tikv.toml b/tests/br_alter_pk_server/config/tikv.toml new file mode 100644 index 000000000..edcd02a98 --- /dev/null +++ b/tests/br_alter_pk_server/config/tikv.toml @@ -0,0 +1,14 @@ +# config of tikv + +[coprocessor] +region-max-keys = 20 +region-split-keys = 12 + +[rocksdb] +max-open-files = 4096 +[raftdb] +max-open-files = 4096 +[raftstore] +# true (default value) for high reliability, this can prevent data loss when power failure. +sync-log = false +capacity = "10GB" diff --git a/tests/br_alter_pk_server/run.sh b/tests/br_alter_pk_server/run.sh new file mode 100755 index 000000000..6485a43be --- /dev/null +++ b/tests/br_alter_pk_server/run.sh @@ -0,0 +1,42 @@ +#!/bin/bash +# +# Copyright 2020 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu + +cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +source $cur/../_utils/run_services + +DB="$TEST_NAME" + +# prepare database +echo "Restart cluster with alter-primary-key = true" +start_services "$cur" + +run_sql "drop schema if exists $DB;" +run_sql "create schema $DB;" + +run_sql "create table $DB.a (a int primary key, b int unique);" +run_sql "insert into $DB.a values (42, 42);" + +# backup +run_br --pd $PD_ADDR backup db --db "$DB" -s "local://$TEST_DIR/$DB" + +# restore +run_sql "drop schema $DB;" +run_br --pd $PD_ADDR restore db --db "$DB" -s "local://$TEST_DIR/$DB" + +run_sql "drop schema $DB;" +echo "Restart service with alter-primary-key = false" +start_services diff --git a/tests/br_db_skip/run.sh b/tests/br_db_skip/run.sh new file mode 100755 index 000000000..e126447c6 --- /dev/null +++ b/tests/br_db_skip/run.sh @@ -0,0 +1,72 @@ +#!/bin/sh +# +# Copyright 2019 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +DB="$TEST_NAME" + +run_sql "CREATE DATABASE $DB;" + +run_sql "CREATE TABLE $DB.usertable1 ( \ + YCSB_KEY varchar(64) NOT NULL, \ + FIELD0 varchar(1) DEFAULT NULL, \ + PRIMARY KEY (YCSB_KEY) \ +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;" + +run_sql "INSERT INTO $DB.usertable1 VALUES (\"a\", \"b\");" +run_sql "INSERT INTO $DB.usertable1 VALUES (\"aa\", \"b\");" + +# backup db +echo "backup start..." +run_br --pd $PD_ADDR backup db --db "$DB" -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4 + +run_sql "DROP DATABASE $DB;" + +run_sql "CREATE DATABASE $DB;" +# restore db with skip-create-sql must failed +echo "restore start but must failed" +fail=false +run_br restore db --db $DB -s "local://$TEST_DIR/$DB" --pd $PD_ADDR --no-schema=true || fail=true +if $fail; then + # Error: [schema:1146]Table 'br_db_skip.usertable1' doesn't exist + echo "TEST: [$TEST_NAME] restore $DB with no-schema must failed" +else + echo "TEST: [$TEST_NAME] restore $DB with no-schema not failed" + exit 1 +fi + + +run_sql "CREATE TABLE $DB.usertable1 ( \ + YCSB_KEY varchar(64) NOT NULL, \ + FIELD0 varchar(1) DEFAULT NULL, \ + PRIMARY KEY (YCSB_KEY) \ +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;" + +echo "restore start must succeed" +fail=false +run_br restore db --db $DB -s "local://$TEST_DIR/$DB" --pd $PD_ADDR --no-schema=true || fail=true +if $fail; then + echo "TEST: [$TEST_NAME] restore $DB with no-schema failed" + exit 1 +else + echo "TEST: [$TEST_NAME] restore $DB with no-schema succeed" +fi + +table_count=$(run_sql "use $DB; show tables;" | grep "Tables_in" | wc -l) +if [ "$table_count" -ne "1" ];then + echo "TEST: [$TEST_NAME] failed!" + exit 1 +fi + +run_sql "DROP DATABASE $DB;" diff --git a/tests/br_full_ddl/run.sh b/tests/br_full_ddl/run.sh index e50ef1ecf..93c5b28fb 100755 --- a/tests/br_full_ddl/run.sh +++ b/tests/br_full_ddl/run.sh @@ -36,7 +36,10 @@ done # backup full echo "backup start..." -run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4 --log-file $LOG +# Do not log to terminal +unset BR_LOG_TO_TERM +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4 --log-file $LOG || cat $LOG +BR_LOG_TO_TERM=1 checksum_count=$(cat $LOG | grep "fast checksum success" | wc -l | xargs) diff --git a/tests/br_full_index/run.sh b/tests/br_full_index/run.sh index 5069035e6..bb2486802 100755 --- a/tests/br_full_index/run.sh +++ b/tests/br_full_index/run.sh @@ -36,7 +36,10 @@ done # backup full echo "backup start..." -run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4 --log-file $LOG +# Do not log to terminal +unset BR_LOG_TO_TERM +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4 --log-file $LOG || cat $LOG +BR_LOG_TO_TERM=1 checksum_count=$(cat $LOG | grep "fast checksum success" | wc -l | xargs) diff --git a/tests/br_incremental_only_ddl/run.sh b/tests/br_incremental_only_ddl/run.sh new file mode 100755 index 000000000..f525acda4 --- /dev/null +++ b/tests/br_incremental_only_ddl/run.sh @@ -0,0 +1,72 @@ +#!/bin/sh +# +# Copyright 2019 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +DB="$TEST_NAME" +TABLE="usertable" +ROW_COUNT=100 +PATH="tests/$TEST_NAME:bin:$PATH" + +echo "load data..." +# create database +run_sql "CREATE DATABASE IF NOT EXISTS $DB;" +# create table +run_sql "CREATE TABLE IF NOT EXISTS ${DB}.${TABLE} (c1 INT);" +# insert records +for i in $(seq $ROW_COUNT); do + run_sql "INSERT INTO ${DB}.${TABLE}(c1) VALUES ($i);" +done + +# full backup +echo "full backup start..." +run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/full" --db $DB -t $TABLE --ratelimit 5 --concurrency 4 +# run ddls +echo "run ddls..." +run_sql "RENAME TABLE ${DB}.${TABLE} to ${DB}.${TABLE}1;" +run_sql "DROP TABLE ${DB}.${TABLE}1;" +run_sql "DROP DATABASE ${DB};" +run_sql "CREATE DATABASE ${DB};" +run_sql "CREATE TABLE ${DB}.${TABLE}1 (c2 CHAR(255));" +run_sql "RENAME TABLE ${DB}.${TABLE}1 to ${DB}.${TABLE};" +run_sql "TRUNCATE TABLE ${DB}.${TABLE};" + +# incremental backup +echo "incremental backup start..." +last_backup_ts=$(br validate decode --field="end-version" -s "local://$TEST_DIR/$DB/full" | tail -n1) +run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/inc" --db $DB -t $TABLE --ratelimit 5 --concurrency 4 --lastbackupts $last_backup_ts + +run_sql "DROP DATABASE $DB;" + +# full restore +echo "full restore start..." +run_br restore table --db $DB --table $TABLE -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR +row_count_full=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') +# check full restore +if [ "${row_count_full}" != "${ROW_COUNT}" ];then + echo "TEST: [$TEST_NAME] full restore fail on database $DB" + exit 1 +fi +# incremental restore +echo "incremental restore start..." +fail=false +run_br restore table --db $DB --table $TABLE -s "local://$TEST_DIR/$DB/inc" --pd $PD_ADDR || fail=true +if $fail; then + echo "TEST: [$TEST_NAME] incremental restore fail on database $DB" + exit 1 +else + echo "TEST: [$TEST_NAME] successed!" +fi + +run_sql "DROP DATABASE $DB;" diff --git a/tests/br_incremental_same_table/run.sh b/tests/br_incremental_same_table/run.sh new file mode 100755 index 000000000..797806837 --- /dev/null +++ b/tests/br_incremental_same_table/run.sh @@ -0,0 +1,86 @@ +#!/bin/sh +# +# Copyright 2019 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +DB="$TEST_NAME" +TABLE="usertable" +ROW_COUNT=100 +PATH="tests/$TEST_NAME:bin:$PATH" +DB_COUNT=3 + +echo "load data..." + +# create database +run_sql "CREATE DATABASE IF NOT EXISTS $DB;" +# create table +run_sql "CREATE TABLE IF NOT EXISTS ${DB}.${TABLE} (c1 INT);" +# insert records +for i in $(seq $ROW_COUNT); do + run_sql "INSERT INTO ${DB}.${TABLE}(c1) VALUES ($i);" +done + +# full backup +echo "full backup start..." +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/full" --ratelimit 5 --concurrency 4 +# run ddls + +# create 3 databases, each db has one table with same name +for i in $(seq $DB_COUNT); do + # create database + run_sql "CREATE DATABASE $DB$i;" + # create table + run_sql "CREATE TABLE IF NOT EXISTS $DB$i.${TABLE} (c1 INT);" + # insert records + for j in $(seq $ROW_COUNT); do + run_sql "INSERT INTO $DB$i.${TABLE}(c1) VALUES ($j);" + done +done + +# incremental backup +echo "incremental backup start..." +last_backup_ts=$(br validate decode --field="end-version" -s "local://$TEST_DIR/$DB/full" | tail -n1) +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB/inc" --ratelimit 5 --concurrency 4 --lastbackupts $last_backup_ts + +# cleanup env +run_sql "DROP DATABASE $DB;" +for i in $(seq $DB_COUNT); do + run_sql "DROP DATABASE $DB$i;" +done + +# full restore +echo "full restore start..." +run_br restore full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR +row_count_full=$(run_sql "SELECT COUNT(*) FROM $DB.$TABLE;" | awk '/COUNT/{print $2}') +# check full restore +if [ "${row_count_full}" != "${ROW_COUNT}" ];then + echo "TEST: [$TEST_NAME] full restore fail on database $DB" + exit 1 +fi + +# incremental restore only DB2.Table +echo "incremental restore start..." +run_br restore table --db ${DB}2 --table $TABLE -s "local://$TEST_DIR/$DB/inc" --pd $PD_ADDR +row_count_inc=$(run_sql "SELECT COUNT(*) FROM ${DB}2.$TABLE;" | awk '/COUNT/{print $2}') +# check full restore +if [ "${row_count_inc}" != "${ROW_COUNT}" ];then + echo "TEST: [$TEST_NAME] incremental restore fail on database $DB" + exit 1 +fi + +# cleanup env +run_sql "DROP DATABASE $DB;" +for i in $(seq $DB_COUNT); do + run_sql "DROP DATABASE IF EXISTS $DB$i;" +done diff --git a/tests/br_move_backup/run.sh b/tests/br_move_backup/run.sh index 43f27a9af..b85d25823 100755 --- a/tests/br_move_backup/run.sh +++ b/tests/br_move_backup/run.sh @@ -32,6 +32,15 @@ run_sql "DROP TABLE $DB.$TABLE;" # change backup path mv $TEST_DIR/$DB $TEST_DIR/another$DB +# restore table with old path +echo "restore with old path start..." +run_br restore table --db $DB --table $TABLE -s "local://$TEST_DIR/$DB" --pd $PD_ADDR || restore_old_fail=1 + +if [ "$restore_old_fail" -ne "1" ];then + echo "TEST: [$TEST_NAME] test restore with old path failed!" + exit 1 +fi + # restore table echo "restore start..." run_br restore table --db $DB --table $TABLE -s "local://$TEST_DIR/another$DB" --pd $PD_ADDR diff --git a/tests/run.sh b/tests/run.sh index 21d6b27ed..5b1111afd 100755 --- a/tests/run.sh +++ b/tests/run.sh @@ -39,5 +39,6 @@ for script in tests/${TEST_NAME-*}/run.sh; do TIKV_ADDR="$TIKV_ADDR" \ PATH="tests/_utils:bin:$PATH" \ TEST_NAME="$(basename "$(dirname "$script")")" \ + BR_LOG_TO_TERM=1 \ bash "$script" done