From 3e939c8555e46c56c279f997a86a52bd4e645d3c Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Tue, 4 Feb 2020 16:23:23 +0800 Subject: [PATCH 01/12] restore: enhance error handling Signed-off-by: 5kbpers --- go.mod | 4 +- go.sum | 17 ++++- pkg/restore/backoff.go | 111 ++++++++++++++++++++++++++++++++ pkg/restore/client.go | 37 +++++------ pkg/restore/import.go | 143 ++++++++++++++++++----------------------- pkg/restore/util.go | 42 +----------- pkg/utils/retry.go | 32 +++++++++ 7 files changed, 241 insertions(+), 145 deletions(-) create mode 100644 pkg/restore/backoff.go create mode 100644 pkg/utils/retry.go diff --git a/go.mod b/go.mod index 9951c2922..a237987e9 100644 --- a/go.mod +++ b/go.mod @@ -9,12 +9,14 @@ require ( github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect github.com/cznic/sortutil v0.0.0-20181122101858-f5f958428db8 // indirect + github.com/fatih/color v1.9.0 // indirect github.com/fsouza/fake-gcs-server v1.15.0 github.com/go-sql-driver/mysql v1.4.1 github.com/gogo/protobuf v1.3.1 github.com/golang/snappy v0.0.1 // indirect github.com/google/btree v1.0.0 github.com/google/uuid v1.1.1 + github.com/mattn/go-runewidth v0.0.7 // indirect github.com/onsi/ginkgo v1.10.3 // indirect github.com/onsi/gomega v1.7.1 // indirect github.com/pingcap/check v0.0.0-20191107115940-caf2b9e6ccf4 @@ -35,7 +37,7 @@ require ( golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f // indirect golang.org/x/net v0.0.0-20191011234655-491137f69257 // indirect golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 - golang.org/x/tools v0.0.0-20191213032237-7093a17b0467 // indirect + golang.org/x/tools v0.0.0-20200107184032-11e9d9cc0042 // indirect google.golang.org/api v0.14.0 google.golang.org/grpc v1.25.1 ) diff --git a/go.sum b/go.sum index 085e00355..a61976b2b 100644 --- a/go.sum +++ b/go.sum @@ -86,6 +86,8 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= +github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s= +github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsouza/fake-gcs-server v1.15.0 h1:ss/ztlt10Y64A5qslmxZKsiqW/i28t5DkRtv6qSFaLQ= @@ -209,12 +211,18 @@ github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czP github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU= github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-colorable v0.1.4 h1:snbPLB8fVfU9iwbbo30TPtbLRzwWu6aJS6Xh4eaaviA= +github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.11 h1:FxPOTFNqGkuDUGi3H/qkUbQO4ZiBa2brKq5r0l8TGeM= +github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y= github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/mattn/go-runewidth v0.0.7 h1:Ei8KR0497xHyKJPAv59M1dkC+rOZCMBJ+t3fZ+twI54= +github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-shellwords v1.0.3/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o= github.com/matttproud/golang_protobuf_extensions v1.0.0/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= @@ -374,6 +382,7 @@ github.com/urfave/negroni v0.3.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKn github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +github.com/yookoala/realpath v1.0.0 h1:7OA9pj4FZd+oZDsyvXWQvjn5oBdcHRTV44PpdMSuImQ= github.com/yookoala/realpath v1.0.0/go.mod h1:gJJMA9wuX7AcqLy1+ffPatSCySA1FQ2S8Ya9AIoYBpE= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= @@ -415,6 +424,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190909091759-094676da4a83/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413 h1:ULYEB3JvPRE/IfO+9uO7vKV/xzVTO7XPAwm8xbf4w2g= golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -442,6 +452,7 @@ golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCc golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -488,6 +499,7 @@ golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190909082730-f460065e899a h1:mIzbOulag9/gXacgxKlFVwpCOWSfBT3/pDyyCwGA9as= golang.org/x/sys v0.0.0-20190909082730-f460065e899a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191210023423-ac6580df4449 h1:gSbV7h1NRL2G1xTg/owz62CST1oJBmxy4QpMMregXVQ= golang.org/x/sys v0.0.0-20191210023423-ac6580df4449/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= @@ -525,9 +537,10 @@ golang.org/x/tools v0.0.0-20191107010934-f79515f33823/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2 h1:EtTFh6h4SAKemS+CURDMTDIANuduG5zKEXShyy18bGA= golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20191213032237-7093a17b0467 h1:Jybbe55FT+YYZIJGWmJIA4ZGcglFuZOduakIW3+gHXY= -golang.org/x/tools v0.0.0-20191213032237-7093a17b0467/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200107184032-11e9d9cc0042 h1:BKiPVwWbEdmAh+5CBwk13CYeVJQRDJpDnKgDyMOGz9M= +golang.org/x/tools v0.0.0-20200107184032-11e9d9cc0042/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg= diff --git a/pkg/restore/backoff.go b/pkg/restore/backoff.go new file mode 100644 index 000000000..db1dfff51 --- /dev/null +++ b/pkg/restore/backoff.go @@ -0,0 +1,111 @@ +package restore + +import ( + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "go.uber.org/zap" + + "github.com/pingcap/br/pkg/utils" +) + +var ( + errNotLeader = errors.New("not leader") + errKeyNotInRegion = errors.New("key not in region") + errResp = errors.New("response error") + errRewriteRuleNotFound = errors.New("rewrite rule not found") + errRangeIsEmpty = errors.New("range is empty") + errGrpc = errors.New("gRPC error") + + // TODO: add `error` field to `DownloadResponse` for distinguish the errors of gRPC + // and the errors of request + errBadFormat = errors.New("bad format") + errWrongKeyPrefix = errors.New("wrong key prefix") + errFileCorrupted = errors.New("file corrupted") + errCannotRead = errors.New("cannot read externel storage") +) + +const ( + importSSTRetryTimes = 16 + importSSTWaitInterval = 10 * time.Millisecond + importSSTMaxWaitInterval = 1 * time.Second + + downloadSSTRetryTimes = 8 + downloadSSTWaitInterval = 10 * time.Millisecond + downloadSSTMaxWaitInterval = 1 * time.Second + + resetTsRetryTime = 16 + resetTSWaitInterval = 50 * time.Millisecond + resetTSMaxWaitInterval = 500 * time.Millisecond +) + +type importerBackoffer struct { + attempt int + delayTime time.Duration + maxDelayTime time.Duration +} + +func newImportSSTBackoffer() utils.Backoffer { + return &importerBackoffer{ + attempt: importSSTRetryTimes, + delayTime: importSSTWaitInterval, + maxDelayTime: importSSTMaxWaitInterval, + } +} + +func newDownloadSSTBackoffer() utils.Backoffer { + return &importerBackoffer{ + attempt: downloadSSTRetryTimes, + delayTime: downloadSSTWaitInterval, + maxDelayTime: downloadSSTMaxWaitInterval, + } +} + +func (bo *importerBackoffer) NextBackoff(err error) time.Duration { + switch err { + case errResp, errGrpc: + bo.delayTime = 2 * bo.delayTime + bo.attempt-- + default: + // Don't continue to retry + bo.delayTime = 0 + bo.attempt = 0 + log.Warn("undetemined error, stop to retry", zap.Error(err)) + } + if bo.delayTime > bo.maxDelayTime { + return bo.maxDelayTime + } + return bo.delayTime +} + +func (bo *importerBackoffer) Attempt() int { + return bo.attempt +} + +type resetTSBackoffer struct { + attempt int + delayTime time.Duration + maxDelayTime time.Duration +} + +func newResetTSBackoffer() utils.Backoffer { + return &resetTSBackoffer{ + attempt: resetTsRetryTime, + delayTime: resetTSWaitInterval, + maxDelayTime: resetTSMaxWaitInterval, + } +} + +func (bo *resetTSBackoffer) NextBackoff(err error) time.Duration { + bo.delayTime = 2 * bo.delayTime + bo.attempt-- + if bo.delayTime > bo.maxDelayTime { + return bo.maxDelayTime + } + return bo.delayTime +} + +func (bo *resetTSBackoffer) Attempt() int { + return bo.attempt +} diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 3030ba857..0befbc75f 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -26,15 +26,9 @@ import ( "github.com/pingcap/br/pkg/utils" ) -const ( - resetTsRetryTime = 16 - resetTSWaitInterval = 50 * time.Millisecond - resetTSMaxWaitInterval = 500 * time.Millisecond - - // defaultChecksumConcurrency is the default number of the concurrent - // checksum tasks. - defaultChecksumConcurrency = 64 -) +// defaultChecksumConcurrency is the default number of the concurrent +// checksum tasks. +const defaultChecksumConcurrency = 64 // Client sends requests to restore files type Client struct { @@ -138,13 +132,10 @@ func (rc *Client) ResetTS(pdAddrs []string) error { restoreTS := rc.backupMeta.GetEndVersion() log.Info("reset pd timestamp", zap.Uint64("ts", restoreTS)) i := 0 - return withRetry(func() error { + return utils.WithRetry(func() error { idx := i % len(pdAddrs) return utils.ResetTS(pdAddrs[idx], restoreTS) - }, func(e error) bool { - i++ - return true - }, resetTsRetryTime, resetTSWaitInterval, resetTSMaxWaitInterval) + }, newResetTSBackoffer()) } // GetDatabases returns all databases. @@ -237,8 +228,10 @@ func (rc *Client) RestoreTable( start := time.Now() defer func() { elapsed := time.Since(start) - log.Info("restore table", - zap.Stringer("table", table.Schema.Name), zap.Duration("take", elapsed)) + if err == nil { + log.Info("Restore Table", + zap.Stringer("table", table.Schema.Name), zap.Duration("take", elapsed)) + } key := fmt.Sprintf("%s.%s", table.Db.Name.String(), table.Schema.Name.String()) if err != nil { summary.CollectFailureUnit(key, err) @@ -304,8 +297,10 @@ func (rc *Client) RestoreDatabase( ) (err error) { start := time.Now() defer func() { - elapsed := time.Since(start) - log.Info("Restore Database", zap.Stringer("db", db.Schema.Name), zap.Duration("take", elapsed)) + if err == nil { + elapsed := time.Since(start) + log.Info("Restore Database", zap.Stringer("db", db.Schema.Name), zap.Duration("take", elapsed)) + } }() errCh := make(chan error, len(db.Tables)) wg := new(sync.WaitGroup) @@ -340,8 +335,10 @@ func (rc *Client) RestoreAll( ) (err error) { start := time.Now() defer func() { - elapsed := time.Since(start) - log.Info("Restore All", zap.Duration("take", elapsed)) + if err == nil { + elapsed := time.Since(start) + log.Info("Restore All", zap.Duration("take", elapsed)) + } }() errCh := make(chan error, len(rc.databases)) wg := new(sync.WaitGroup) diff --git a/pkg/restore/import.go b/pkg/restore/import.go index 77273ebab..fadcacc15 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -2,6 +2,7 @@ package restore import ( "context" + "strings" "sync" "time" @@ -16,25 +17,10 @@ import ( "google.golang.org/grpc" "github.com/pingcap/br/pkg/summary" + "github.com/pingcap/br/pkg/utils" ) -var ( - errNotLeader = errors.New("not leader") - errEpochNotMatch = errors.New("epoch not match") - errRewriteRuleNotFound = errors.New("rewrite rule not found") - errRangeIsEmpty = errors.New("range is empty") -) - -const ( - importScanResgionTime = 10 * time.Second - importFileRetryTimes = 16 - importFileWaitInterval = 10 * time.Millisecond - importFileMaxWaitInterval = 1 * time.Second - - downloadSSTRetryTimes = 8 - downloadSSTWaitInterval = 10 * time.Millisecond - downloadSSTMaxWaitInterval = 1 * time.Second -) +const importScanResgionTime = 10 * time.Second // ImporterClient is used to import a file to TiKV type ImporterClient interface { @@ -172,9 +158,8 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul log.Debug("rewrite file keys", zap.Stringer("file", file), zap.Binary("startKey", startKey), - zap.Binary("endKey", endKey), - ) - err = withRetry(func() error { + zap.Binary("endKey", endKey)) + err = utils.WithRetry(func() error { ctx, cancel := context.WithTimeout(importer.ctx, importScanResgionTime) defer cancel() // Scan regions covered by the file range @@ -185,63 +170,49 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul log.Debug("scan regions", zap.Stringer("file", file), zap.Int("count", len(regionInfos))) // Try to download and ingest the file in every region for _, regionInfo := range regionInfos { - var downloadMeta *import_sstpb.SSTMeta info := regionInfo // Try to download file. - err = withRetry(func() error { - var err2 error - var isEmpty bool - downloadMeta, isEmpty, err2 = importer.downloadSST(info, file, rewriteRules) - if err2 != nil { - if err != errRewriteRuleNotFound { - log.Warn("download file failed", - zap.Stringer("file", file), - zap.Stringer("region", info.Region), - zap.Binary("startKey", startKey), - zap.Binary("endKey", endKey), - zap.Error(err2), - ) - } - return err2 - } - if isEmpty { - log.Info( - "file don't have any key in this region, skip it", - zap.Stringer("file", file), - zap.Stringer("region", info.Region), - ) - return errRangeIsEmpty - } - return nil - }, func(e error) bool { - // Scan regions may return some regions which cannot match any rewrite rule, - // like [t{tableID}, t{tableID}_r), those regions should be skipped - return e != errRewriteRuleNotFound && e != errRangeIsEmpty - }, downloadSSTRetryTimes, downloadSSTWaitInterval, downloadSSTMaxWaitInterval) - if err != nil { - if err == errRewriteRuleNotFound || err == errRangeIsEmpty { + var downloadMeta *import_sstpb.SSTMeta + err1 = utils.WithRetry(func() error { + var e error + downloadMeta, e = importer.downloadSST(info, file, rewriteRules) + return e + }, newDownloadSSTBackoffer()) + if err1 != nil { + if err1 == errRewriteRuleNotFound || err1 == errRangeIsEmpty { // Skip this region continue } - return err + log.Error("download file failed", + zap.Stringer("file", file), + zap.Stringer("region", info.Region), + zap.Binary("startKey", startKey), + zap.Binary("endKey", endKey), + zap.Error(err1)) + return err1 } - err = importer.ingestSST(downloadMeta, info) - if err != nil { - log.Warn("ingest file failed", + err1 = importer.ingestSST(downloadMeta, info) + // If error is `NotLeader`, update the region info and retry + for err1 == errNotLeader { + info, err1 = importer.metaClient.GetRegion(ctx, info.Region.GetStartKey()) + if err1 != nil { + break + } + err1 = importer.ingestSST(downloadMeta, info) + } + if err1 != nil { + log.Error("ingest file failed", zap.Stringer("file", file), zap.Stringer("range", downloadMeta.GetRange()), zap.Stringer("region", info.Region), - zap.Error(err), - ) - return err + zap.Error(err1)) + return err1 } summary.CollectSuccessUnit(summary.TotalKV, file.TotalKvs) summary.CollectSuccessUnit(summary.TotalBytes, file.TotalBytes) } return nil - }, func(e error) bool { - return true - }, importFileRetryTimes, importFileWaitInterval, importFileMaxWaitInterval) + }, newImportSSTBackoffer()) return err } @@ -257,33 +228,25 @@ func (importer *FileImporter) downloadSST( regionInfo *RegionInfo, file *backup.File, rewriteRules *RewriteRules, -) (*import_sstpb.SSTMeta, bool, error) { +) (*import_sstpb.SSTMeta, error) { id, err := uuid.New().MarshalBinary() if err != nil { - return nil, true, errors.Trace(err) + return nil, errors.Trace(err) } // Assume one region reflects to one rewrite rule _, key, err := codec.DecodeBytes(regionInfo.Region.GetStartKey()) if err != nil { - return nil, true, err + return nil, err } regionRule := matchNewPrefix(key, rewriteRules) if regionRule == nil { - log.Debug("cannot find rewrite rule, skip region", - zap.Stringer("region", regionInfo.Region), - zap.Array("tableRule", rules(rewriteRules.Table)), - zap.Array("dataRule", rules(rewriteRules.Data)), - zap.Binary("key", key), - ) - return nil, true, errRewriteRuleNotFound + return nil, errRewriteRuleNotFound } rule := import_sstpb.RewriteRule{ OldKeyPrefix: encodeKeyPrefix(regionRule.GetOldKeyPrefix()), NewKeyPrefix: encodeKeyPrefix(regionRule.GetNewKeyPrefix()), } sstMeta := getSSTMetaFromFile(id, file, regionInfo.Region, &rule) - sstMeta.RegionId = regionInfo.Region.GetId() - sstMeta.RegionEpoch = regionInfo.Region.GetRegionEpoch() req := &import_sstpb.DownloadRequest{ Sst: sstMeta, StorageBackend: importer.backend, @@ -298,15 +261,15 @@ func (importer *FileImporter) downloadSST( for _, peer := range regionInfo.Region.GetPeers() { resp, err = importer.importClient.DownloadSST(importer.ctx, peer.GetStoreId(), req) if err != nil { - return nil, true, err + return nil, extractDownloadSSTError(err) } if resp.GetIsEmpty() { - return &sstMeta, true, nil + return nil, errRangeIsEmpty } } sstMeta.Range.Start = truncateTS(resp.Range.GetStart()) sstMeta.Range.End = truncateTS(resp.Range.GetEnd()) - return &sstMeta, false, nil + return &sstMeta, nil } func (importer *FileImporter) ingestSST( @@ -333,13 +296,29 @@ func (importer *FileImporter) ingestSST( } respErr := resp.GetError() if respErr != nil { - if respErr.EpochNotMatch != nil { - return errEpochNotMatch + log.Debug("ingest sst resp error", zap.Stringer("error", respErr)) + if respErr.GetKeyNotInRegion() != nil { + return errKeyNotInRegion } - if respErr.NotLeader != nil { + if respErr.GetNotLeader() != nil { return errNotLeader } - return errors.Errorf("ingest failed: %v", respErr) + return errResp } return nil } + +func extractDownloadSSTError(e error) error { + err := errGrpc + switch { + case strings.Contains(e.Error(), "bad format"): + err = errBadFormat + case strings.Contains(e.Error(), "wrong prefix"): + err = errWrongKeyPrefix + case strings.Contains(e.Error(), "corrupted"): + err = errFileCorrupted + case strings.Contains(e.Error(), "Cannot read"): + err = errCannotRead + } + return err +} diff --git a/pkg/restore/util.go b/pkg/restore/util.go index a2e9e3e38..4687100cc 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -32,15 +32,6 @@ func (fs files) MarshalLogArray(arr zapcore.ArrayEncoder) error { return nil } -type rules []*import_sstpb.RewriteRule - -func (rs rules) MarshalLogArray(arr zapcore.ArrayEncoder) error { - for i := range rs { - arr.AppendString(rs[i].String()) - } - return nil -} - // idAllocator always returns a specified ID type idAllocator struct { id int64 @@ -158,40 +149,11 @@ func getSSTMetaFromFile( Start: rangeStart, End: rangeEnd, }, + RegionId: region.GetId(), + RegionEpoch: region.GetRegionEpoch(), } } -type retryableFunc func() error -type continueFunc func(error) bool - -func withRetry( - retryableFunc retryableFunc, - continueFunc continueFunc, - attempts uint, - delayTime time.Duration, - maxDelayTime time.Duration, -) error { - var lastErr error - for i := uint(0); i < attempts; i++ { - err := retryableFunc() - if err != nil { - lastErr = err - // If this is the last attempt, do not wait - if !continueFunc(err) || i == attempts-1 { - break - } - delayTime = 2 * delayTime - if delayTime > maxDelayTime { - delayTime = maxDelayTime - } - time.Sleep(delayTime) - } else { - return nil - } - } - return lastErr -} - // ValidateFileRanges checks and returns the ranges of the files. func ValidateFileRanges( files []*backup.File, diff --git a/pkg/utils/retry.go b/pkg/utils/retry.go new file mode 100644 index 000000000..b63f1f79c --- /dev/null +++ b/pkg/utils/retry.go @@ -0,0 +1,32 @@ +package utils + +import "time" + +// RetryableFunc presents a retryable opreation +type RetryableFunc func() error + +// Backoffer implements a backoff policy for retrying operations +type Backoffer interface { + // NextBackoff returns a duration to wait before retrying again + NextBackoff(err error) time.Duration + // Attempt returns the remain attempt times + Attempt() int +} + +// WithRetry retrys a given operation with a backoff policy +func WithRetry( + retryableFunc RetryableFunc, + backoffer Backoffer, +) error { + var lastErr error + for backoffer.Attempt() > 0 { + err := retryableFunc() + if err != nil { + lastErr = err + time.Sleep(backoffer.NextBackoff(err)) + } else { + return nil + } + } + return lastErr +} From cee84f1bc6131480a507c3a3b92f24ca7a005c7e Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Tue, 4 Feb 2020 17:19:54 +0800 Subject: [PATCH 02/12] unit test Signed-off-by: 5kbpers --- pkg/restore/backoff.go | 8 ++++-- pkg/restore/backoff_test.go | 57 +++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 2 deletions(-) create mode 100644 pkg/restore/backoff_test.go diff --git a/pkg/restore/backoff.go b/pkg/restore/backoff.go index db1dfff51..76161e73d 100644 --- a/pkg/restore/backoff.go +++ b/pkg/restore/backoff.go @@ -67,11 +67,15 @@ func (bo *importerBackoffer) NextBackoff(err error) time.Duration { case errResp, errGrpc: bo.delayTime = 2 * bo.delayTime bo.attempt-- + case errRangeIsEmpty, errRewriteRuleNotFound: + // Excepted error, finish the operation + bo.delayTime = 0 + bo.attempt = 0 default: - // Don't continue to retry + // Unexcepted error bo.delayTime = 0 bo.attempt = 0 - log.Warn("undetemined error, stop to retry", zap.Error(err)) + log.Warn("unexcepted error, stop to retry", zap.Error(err)) } if bo.delayTime > bo.maxDelayTime { return bo.maxDelayTime diff --git a/pkg/restore/backoff_test.go b/pkg/restore/backoff_test.go new file mode 100644 index 000000000..084660020 --- /dev/null +++ b/pkg/restore/backoff_test.go @@ -0,0 +1,57 @@ +package restore + +import ( + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/tidb/util/testleak" + + "github.com/pingcap/br/pkg/utils" +) + +var _ = Suite(&testBackofferSuite{}) + +type testBackofferSuite struct { + mock *utils.MockCluster +} + +func (s *testBackofferSuite) SetUpSuite(c *C) { + var err error + s.mock, err = utils.NewMockCluster() + c.Assert(err, IsNil) +} + +func (s *testBackofferSuite) TearDownSuite(c *C) { + testleak.AfterTest(c)() +} + +func (s *testBackofferSuite) TestImporterBackoffer(c *C) { + var counter int + err := utils.WithRetry(func() error { + defer func() { counter++ }() + switch counter { + case 0: + return errGrpc + case 1: + return errResp + case 2: + return errRangeIsEmpty + } + return nil + }, newImportSSTBackoffer()) + c.Assert(counter, Equals, 3) + c.Assert(err, Equals, errRangeIsEmpty) + + counter = 0 + backoffer := importerBackoffer{ + attempt: 10, + delayTime: time.Nanosecond, + maxDelayTime: time.Nanosecond, + } + err = utils.WithRetry(func() error { + defer func() { counter++ }() + return errResp + }, &backoffer) + c.Assert(counter, Equals, 10) + c.Assert(err, Equals, errResp) +} From 2fdec58b530f1c3c200fda0c37d398fc9c98242e Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Tue, 11 Feb 2020 12:08:29 +0800 Subject: [PATCH 03/12] address comments Signed-off-by: 5kbpers --- pkg/restore/backoff.go | 20 ++++++++++---------- pkg/restore/backoff_test.go | 5 +++-- pkg/restore/client.go | 2 +- pkg/restore/import.go | 4 ++-- pkg/utils/retry.go | 13 +++++++++++-- 5 files changed, 27 insertions(+), 17 deletions(-) diff --git a/pkg/restore/backoff.go b/pkg/restore/backoff.go index 76161e73d..d614e0129 100644 --- a/pkg/restore/backoff.go +++ b/pkg/restore/backoff.go @@ -11,19 +11,19 @@ import ( ) var ( - errNotLeader = errors.New("not leader") - errKeyNotInRegion = errors.New("key not in region") - errResp = errors.New("response error") - errRewriteRuleNotFound = errors.New("rewrite rule not found") - errRangeIsEmpty = errors.New("range is empty") - errGrpc = errors.New("gRPC error") + errNotLeader = errors.NewNoStackError("not leader") + errKeyNotInRegion = errors.NewNoStackError("key not in region") + errResp = errors.NewNoStackError("response error") + errRewriteRuleNotFound = errors.NewNoStackError("rewrite rule not found") + errRangeIsEmpty = errors.NewNoStackError("range is empty") + errGrpc = errors.NewNoStackError("gRPC error") // TODO: add `error` field to `DownloadResponse` for distinguish the errors of gRPC // and the errors of request - errBadFormat = errors.New("bad format") - errWrongKeyPrefix = errors.New("wrong key prefix") - errFileCorrupted = errors.New("file corrupted") - errCannotRead = errors.New("cannot read externel storage") + errBadFormat = errors.NewNoStackError("bad format") + errWrongKeyPrefix = errors.NewNoStackError("wrong key prefix") + errFileCorrupted = errors.NewNoStackError("file corrupted") + errCannotRead = errors.NewNoStackError("cannot read externel storage") ) const ( diff --git a/pkg/restore/backoff_test.go b/pkg/restore/backoff_test.go index 084660020..537f0980c 100644 --- a/pkg/restore/backoff_test.go +++ b/pkg/restore/backoff_test.go @@ -1,6 +1,7 @@ package restore import ( + "context" "time" . "github.com/pingcap/check" @@ -27,7 +28,7 @@ func (s *testBackofferSuite) TearDownSuite(c *C) { func (s *testBackofferSuite) TestImporterBackoffer(c *C) { var counter int - err := utils.WithRetry(func() error { + err := utils.WithRetry(context.Background(), func() error { defer func() { counter++ }() switch counter { case 0: @@ -48,7 +49,7 @@ func (s *testBackofferSuite) TestImporterBackoffer(c *C) { delayTime: time.Nanosecond, maxDelayTime: time.Nanosecond, } - err = utils.WithRetry(func() error { + err = utils.WithRetry(context.Background(), func() error { defer func() { counter++ }() return errResp }, &backoffer) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index 0befbc75f..ab314bb0a 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -132,7 +132,7 @@ func (rc *Client) ResetTS(pdAddrs []string) error { restoreTS := rc.backupMeta.GetEndVersion() log.Info("reset pd timestamp", zap.Uint64("ts", restoreTS)) i := 0 - return utils.WithRetry(func() error { + return utils.WithRetry(rc.ctx, func() error { idx := i % len(pdAddrs) return utils.ResetTS(pdAddrs[idx], restoreTS) }, newResetTSBackoffer()) diff --git a/pkg/restore/import.go b/pkg/restore/import.go index fadcacc15..f036f62da 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -159,7 +159,7 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul zap.Stringer("file", file), zap.Binary("startKey", startKey), zap.Binary("endKey", endKey)) - err = utils.WithRetry(func() error { + err = utils.WithRetry(importer.ctx, func() error { ctx, cancel := context.WithTimeout(importer.ctx, importScanResgionTime) defer cancel() // Scan regions covered by the file range @@ -173,7 +173,7 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul info := regionInfo // Try to download file. var downloadMeta *import_sstpb.SSTMeta - err1 = utils.WithRetry(func() error { + err1 = utils.WithRetry(importer.ctx, func() error { var e error downloadMeta, e = importer.downloadSST(info, file, rewriteRules) return e diff --git a/pkg/utils/retry.go b/pkg/utils/retry.go index b63f1f79c..3116ceaac 100644 --- a/pkg/utils/retry.go +++ b/pkg/utils/retry.go @@ -1,6 +1,9 @@ package utils -import "time" +import ( + "context" + "time" +) // RetryableFunc presents a retryable opreation type RetryableFunc func() error @@ -15,6 +18,7 @@ type Backoffer interface { // WithRetry retrys a given operation with a backoff policy func WithRetry( + ctx context.Context, retryableFunc RetryableFunc, backoffer Backoffer, ) error { @@ -23,7 +27,12 @@ func WithRetry( err := retryableFunc() if err != nil { lastErr = err - time.Sleep(backoffer.NextBackoff(err)) + select { + case <-ctx.Done(): + return lastErr + default: + time.Sleep(backoffer.NextBackoff(err)) + } } else { return nil } From 985676bebda0303017e3b2dc52f759c1f1c11831 Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Tue, 11 Feb 2020 12:52:31 +0800 Subject: [PATCH 04/12] fix region epoch error Signed-off-by: 5kbpers --- pkg/restore/backoff.go | 1 + pkg/restore/import.go | 18 ++++++++++++++++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/pkg/restore/backoff.go b/pkg/restore/backoff.go index d614e0129..bd6699817 100644 --- a/pkg/restore/backoff.go +++ b/pkg/restore/backoff.go @@ -12,6 +12,7 @@ import ( var ( errNotLeader = errors.NewNoStackError("not leader") + errEpochNotMatch = errors.NewNoStackError("epoch not match") errKeyNotInRegion = errors.NewNoStackError("key not in region") errResp = errors.NewNoStackError("response error") errRewriteRuleNotFound = errors.NewNoStackError("rewrite rule not found") diff --git a/pkg/restore/import.go b/pkg/restore/import.go index f036f62da..5597d1358 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -194,11 +194,16 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul err1 = importer.ingestSST(downloadMeta, info) // If error is `NotLeader`, update the region info and retry for err1 == errNotLeader { - info, err1 = importer.metaClient.GetRegion(ctx, info.Region.GetStartKey()) + var newInfo *RegionInfo + newInfo, err1 = importer.metaClient.GetRegion(ctx, info.Region.GetStartKey()) if err1 != nil { break } - err1 = importer.ingestSST(downloadMeta, info) + if !checkRegionEpoch(newInfo, info) { + err1 = errEpochNotMatch + break + } + err1 = importer.ingestSST(downloadMeta, newInfo) } if err1 != nil { log.Error("ingest file failed", @@ -308,6 +313,15 @@ func (importer *FileImporter) ingestSST( return nil } +func checkRegionEpoch(new, old *RegionInfo) bool { + if new.Region.GetId() == old.Region.GetId() && + new.Region.GetRegionEpoch().GetVersion() == old.Region.GetRegionEpoch().GetVersion() && + new.Region.GetRegionEpoch().GetConfVer() == old.Region.GetRegionEpoch().GetConfVer() { + return true + } + return false +} + func extractDownloadSSTError(e error) error { err := errGrpc switch { From 62803024a272e308b077ed05f708b3cf6af45e67 Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Tue, 11 Feb 2020 13:13:05 +0800 Subject: [PATCH 05/12] address comments Signed-off-by: 5kbpers --- pkg/restore/import.go | 2 ++ pkg/utils/retry.go | 3 +-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/restore/import.go b/pkg/restore/import.go index 5597d1358..dc7003d20 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -194,6 +194,8 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul err1 = importer.ingestSST(downloadMeta, info) // If error is `NotLeader`, update the region info and retry for err1 == errNotLeader { + log.Debug("ingest sst returns not leader error, retry it", + zap.Stringer("region", info.Region)) var newInfo *RegionInfo newInfo, err1 = importer.metaClient.GetRegion(ctx, info.Region.GetStartKey()) if err1 != nil { diff --git a/pkg/utils/retry.go b/pkg/utils/retry.go index 3116ceaac..a8f446764 100644 --- a/pkg/utils/retry.go +++ b/pkg/utils/retry.go @@ -30,8 +30,7 @@ func WithRetry( select { case <-ctx.Done(): return lastErr - default: - time.Sleep(backoffer.NextBackoff(err)) + case <-time.After(backoffer.NextBackoff(err)): } } else { return nil From b62743890b8c1e49f210d25c78de23ac9e1260a9 Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Tue, 11 Feb 2020 16:51:33 +0800 Subject: [PATCH 06/12] remove `Restore*` Signed-off-by: 5kbpers --- pkg/restore/client.go | 117 +++++---------------------------------- pkg/task/restore.go | 2 +- tests/br_full_ddl/run.sh | 2 +- 3 files changed, 15 insertions(+), 106 deletions(-) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index ab314bb0a..5402d78bc 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -2,7 +2,6 @@ package restore import ( "context" - "fmt" "math" "sync" "time" @@ -219,9 +218,9 @@ func (rc *Client) setSpeedLimit() error { return nil } -// RestoreTable tries to restore the data of a table. -func (rc *Client) RestoreTable( - table *utils.Table, +// RestoreFiles tries to restore the files. +func (rc *Client) RestoreFiles( + files []*backup.File, rewriteRules *RewriteRules, updateCh chan<- struct{}, ) (err error) { @@ -229,23 +228,18 @@ func (rc *Client) RestoreTable( defer func() { elapsed := time.Since(start) if err == nil { - log.Info("Restore Table", - zap.Stringer("table", table.Schema.Name), zap.Duration("take", elapsed)) - } - key := fmt.Sprintf("%s.%s", table.Db.Name.String(), table.Schema.Name.String()) - if err != nil { - summary.CollectFailureUnit(key, err) + log.Info("Restore Files", + zap.Int("files", len(files)), zap.Duration("take", elapsed)) + summary.CollectSuccessUnit("files", elapsed) } else { - summary.CollectSuccessUnit(key, elapsed) + summary.CollectFailureUnit("files", err) } }() - log.Debug("start to restore table", - zap.Stringer("table", table.Schema.Name), - zap.Stringer("db", table.Db.Name), - zap.Array("files", files(table.Files)), + log.Debug("start to restore files", + zap.Int("files", len(files)), ) - errCh := make(chan error, len(table.Files)) + errCh := make(chan error, len(files)) wg := new(sync.WaitGroup) defer close(errCh) err = rc.setSpeedLimit() @@ -253,7 +247,7 @@ func (rc *Client) RestoreTable( return err } - for _, file := range table.Files { + for _, file := range files { wg.Add(1) fileReplica := file rc.workerPool.Apply( @@ -267,103 +261,18 @@ func (rc *Client) RestoreTable( } }) } - for range table.Files { + for range files { err := <-errCh if err != nil { rc.cancel() wg.Wait() log.Error( - "restore table failed", - zap.Stringer("table", table.Schema.Name), - zap.Stringer("db", table.Db.Name), + "restore files failed", zap.Error(err), ) return err } } - log.Info( - "finish to restore table", - zap.Stringer("table", table.Schema.Name), - zap.Stringer("db", table.Db.Name), - ) - return nil -} - -// RestoreDatabase tries to restore the data of a database -func (rc *Client) RestoreDatabase( - db *utils.Database, - rewriteRules *RewriteRules, - updateCh chan<- struct{}, -) (err error) { - start := time.Now() - defer func() { - if err == nil { - elapsed := time.Since(start) - log.Info("Restore Database", zap.Stringer("db", db.Schema.Name), zap.Duration("take", elapsed)) - } - }() - errCh := make(chan error, len(db.Tables)) - wg := new(sync.WaitGroup) - defer close(errCh) - for _, table := range db.Tables { - wg.Add(1) - tblReplica := table - rc.tableWorkerPool.Apply(func() { - defer wg.Done() - select { - case <-rc.ctx.Done(): - errCh <- nil - case errCh <- rc.RestoreTable( - tblReplica, rewriteRules, updateCh): - } - }) - } - for range db.Tables { - err = <-errCh - if err != nil { - wg.Wait() - return err - } - } - return nil -} - -// RestoreAll tries to restore all the data of backup files. -func (rc *Client) RestoreAll( - rewriteRules *RewriteRules, - updateCh chan<- struct{}, -) (err error) { - start := time.Now() - defer func() { - if err == nil { - elapsed := time.Since(start) - log.Info("Restore All", zap.Duration("take", elapsed)) - } - }() - errCh := make(chan error, len(rc.databases)) - wg := new(sync.WaitGroup) - defer close(errCh) - for _, db := range rc.databases { - wg.Add(1) - dbReplica := db - rc.tableWorkerPool.Apply(func() { - defer wg.Done() - select { - case <-rc.ctx.Done(): - errCh <- nil - case errCh <- rc.RestoreDatabase( - dbReplica, rewriteRules, updateCh): - } - }) - } - - for range rc.databases { - err = <-errCh - if err != nil { - wg.Wait() - return err - } - } return nil } diff --git a/pkg/task/restore.go b/pkg/task/restore.go index f2f3caf43..a56a1d6da 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -139,7 +139,7 @@ func RunRestore(c context.Context, cmdName string, cfg *RestoreConfig) error { if err != nil { return err } - err = client.RestoreAll(rewriteRules, updateCh) + err = client.RestoreFiles(files, rewriteRules, updateCh) // always run the post-work even on error, so we don't stuck in the import mode or emptied schedulers postErr := restorePostWork(ctx, client, mgr, removedSchedulers) diff --git a/tests/br_full_ddl/run.sh b/tests/br_full_ddl/run.sh index 3db1ecd60..1e40415d7 100755 --- a/tests/br_full_ddl/run.sh +++ b/tests/br_full_ddl/run.sh @@ -28,7 +28,7 @@ for i in $(seq $DDL_COUNT); do run_sql "USE $DB; ALTER TABLE $TABLE ADD INDEX (FIELD$i);" done -for i in $(sql $DDL_COUNT); do +for i in $(seq $DDL_COUNT); do if (( RANDOM % 2 )); then run_sql "USE $DB; ALTER TABLE $TABLE DROP INDEX FIELD$i;" fi From 9943e8b3883cab67597227b473244f9ca6cb7d8d Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Tue, 11 Feb 2020 16:54:10 +0800 Subject: [PATCH 07/12] address lint Signed-off-by: 5kbpers --- pkg/restore/util.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/pkg/restore/util.go b/pkg/restore/util.go index 8fa650dbd..63ee92969 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -17,22 +17,12 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" "go.uber.org/zap" - "go.uber.org/zap/zapcore" "github.com/pingcap/br/pkg/summary" ) var recordPrefixSep = []byte("_r") -type files []*backup.File - -func (fs files) MarshalLogArray(arr zapcore.ArrayEncoder) error { - for i := range fs { - arr.AppendString(fs[i].String()) - } - return nil -} - // idAllocator always returns a specified ID type idAllocator struct { id int64 From bd7a31ee300c7dae7addb906de67651aa5f2d8f9 Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Tue, 11 Feb 2020 18:28:29 +0800 Subject: [PATCH 08/12] add debug log Signed-off-by: 5kbpers --- pkg/restore/split.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/restore/split.go b/pkg/restore/split.go index 31b23a60f..3248fdd0d 100644 --- a/pkg/restore/split.go +++ b/pkg/restore/split.go @@ -111,7 +111,7 @@ SplitRegions: } time.Sleep(interval) if i > 3 { - log.Warn("splitting regions failed, retry it", zap.Error(err)) + log.Warn("splitting regions failed, retry it", zap.Error(err), zap.ByteStrings("keys", keys)) } continue SplitRegions } @@ -259,6 +259,7 @@ func getSplitKeys(rewriteRules *RewriteRules, ranges []Range, regions []*RegionI splitKeys = make([][]byte, 0, 1) } splitKeyMap[region.Region.GetId()] = append(splitKeys, key) + log.Debug("get key for split region", zap.Binary("key", key), zap.Stringer("region", region.Region)) } } return splitKeyMap From e0f708bf52b77108a820f669a59457ff5d06eed2 Mon Sep 17 00:00:00 2001 From: 5kbpers <20279863+5kbpers@users.noreply.github.com> Date: Tue, 11 Feb 2020 18:31:28 +0800 Subject: [PATCH 09/12] Apply suggestions from code review Co-Authored-By: kennytm --- pkg/restore/backoff.go | 2 +- pkg/restore/import.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/restore/backoff.go b/pkg/restore/backoff.go index bd6699817..b5c3ca373 100644 --- a/pkg/restore/backoff.go +++ b/pkg/restore/backoff.go @@ -64,7 +64,7 @@ func newDownloadSSTBackoffer() utils.Backoffer { } func (bo *importerBackoffer) NextBackoff(err error) time.Duration { - switch err { + switch errors.Cause(err) { case errResp, errGrpc: bo.delayTime = 2 * bo.delayTime bo.attempt-- diff --git a/pkg/restore/import.go b/pkg/restore/import.go index dc7003d20..b08e2bb1e 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -193,7 +193,7 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul } err1 = importer.ingestSST(downloadMeta, info) // If error is `NotLeader`, update the region info and retry - for err1 == errNotLeader { + for errors.Cause(err1) == errNotLeader { log.Debug("ingest sst returns not leader error, retry it", zap.Stringer("region", info.Region)) var newInfo *RegionInfo @@ -247,7 +247,7 @@ func (importer *FileImporter) downloadSST( } regionRule := matchNewPrefix(key, rewriteRules) if regionRule == nil { - return nil, errRewriteRuleNotFound + return nil, errors.Trace(errRewriteRuleNotFound) } rule := import_sstpb.RewriteRule{ OldKeyPrefix: encodeKeyPrefix(regionRule.GetOldKeyPrefix()), From 35e811b4f485d555a23e5f6f368701e4fc6bd2d7 Mon Sep 17 00:00:00 2001 From: 5kbpers <20279863+5kbpers@users.noreply.github.com> Date: Tue, 11 Feb 2020 18:31:59 +0800 Subject: [PATCH 10/12] Update pkg/restore/import.go Co-Authored-By: kennytm --- pkg/restore/import.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/restore/import.go b/pkg/restore/import.go index b08e2bb1e..dbf0db77e 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -20,7 +20,7 @@ import ( "github.com/pingcap/br/pkg/utils" ) -const importScanResgionTime = 10 * time.Second +const importScanRegionTime = 10 * time.Second // ImporterClient is used to import a file to TiKV type ImporterClient interface { From b4aef673d7ae9b5729305c491b596c5fab8c5492 Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Tue, 11 Feb 2020 18:37:08 +0800 Subject: [PATCH 11/12] fix retry error Signed-off-by: 5kbpers --- pkg/restore/backoff.go | 2 +- pkg/restore/import.go | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/restore/backoff.go b/pkg/restore/backoff.go index b5c3ca373..f058b7df7 100644 --- a/pkg/restore/backoff.go +++ b/pkg/restore/backoff.go @@ -65,7 +65,7 @@ func newDownloadSSTBackoffer() utils.Backoffer { func (bo *importerBackoffer) NextBackoff(err error) time.Duration { switch errors.Cause(err) { - case errResp, errGrpc: + case errResp, errGrpc, errEpochNotMatch, errNotLeader: bo.delayTime = 2 * bo.delayTime bo.attempt-- case errRangeIsEmpty, errRewriteRuleNotFound: diff --git a/pkg/restore/import.go b/pkg/restore/import.go index dbf0db77e..cb92a9234 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -160,7 +160,7 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul zap.Binary("startKey", startKey), zap.Binary("endKey", endKey)) err = utils.WithRetry(importer.ctx, func() error { - ctx, cancel := context.WithTimeout(importer.ctx, importScanResgionTime) + ctx, cancel := context.WithTimeout(importer.ctx, importScanRegionTime) defer cancel() // Scan regions covered by the file range regionInfos, err1 := importer.metaClient.ScanRegions(ctx, startKey, endKey, 0) @@ -197,7 +197,7 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul log.Debug("ingest sst returns not leader error, retry it", zap.Stringer("region", info.Region)) var newInfo *RegionInfo - newInfo, err1 = importer.metaClient.GetRegion(ctx, info.Region.GetStartKey()) + newInfo, err1 = importer.metaClient.GetRegion(importer.ctx, info.Region.GetStartKey()) if err1 != nil { break } @@ -271,7 +271,7 @@ func (importer *FileImporter) downloadSST( return nil, extractDownloadSSTError(err) } if resp.GetIsEmpty() { - return nil, errRangeIsEmpty + return nil, errors.Trace(errRangeIsEmpty) } } sstMeta.Range.Start = truncateTS(resp.Range.GetStart()) @@ -299,18 +299,18 @@ func (importer *FileImporter) ingestSST( log.Debug("download SST", zap.Stringer("sstMeta", sstMeta)) resp, err := importer.importClient.IngestSST(importer.ctx, leader.GetStoreId(), req) if err != nil { - return err + return errors.Trace(err) } respErr := resp.GetError() if respErr != nil { log.Debug("ingest sst resp error", zap.Stringer("error", respErr)) if respErr.GetKeyNotInRegion() != nil { - return errKeyNotInRegion + return errors.Trace(errKeyNotInRegion) } if respErr.GetNotLeader() != nil { - return errNotLeader + return errors.Trace(errNotLeader) } - return errResp + return errors.Trace(errResp) } return nil } @@ -336,5 +336,5 @@ func extractDownloadSSTError(e error) error { case strings.Contains(e.Error(), "Cannot read"): err = errCannotRead } - return err + return errors.Trace(err) } From 763f574b35208df78211e910bf1fa3df9e734deb Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Thu, 13 Feb 2020 13:18:26 +0800 Subject: [PATCH 12/12] handle RegionNotFound error Signed-off-by: 5kbpers --- pkg/restore/backoff.go | 1 + pkg/restore/import.go | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/restore/backoff.go b/pkg/restore/backoff.go index f058b7df7..dae14e109 100644 --- a/pkg/restore/backoff.go +++ b/pkg/restore/backoff.go @@ -14,6 +14,7 @@ var ( errNotLeader = errors.NewNoStackError("not leader") errEpochNotMatch = errors.NewNoStackError("epoch not match") errKeyNotInRegion = errors.NewNoStackError("key not in region") + errRegionNotFound = errors.NewNoStackError("region not found") errResp = errors.NewNoStackError("response error") errRewriteRuleNotFound = errors.NewNoStackError("rewrite rule not found") errRangeIsEmpty = errors.NewNoStackError("range is empty") diff --git a/pkg/restore/import.go b/pkg/restore/import.go index cb92a9234..01f8456ef 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -299,6 +299,9 @@ func (importer *FileImporter) ingestSST( log.Debug("download SST", zap.Stringer("sstMeta", sstMeta)) resp, err := importer.importClient.IngestSST(importer.ctx, leader.GetStoreId(), req) if err != nil { + if strings.Contains(err.Error(), "RegionNotFound") { + return errors.Trace(errRegionNotFound) + } return errors.Trace(err) } respErr := resp.GetError() @@ -310,7 +313,7 @@ func (importer *FileImporter) ingestSST( if respErr.GetNotLeader() != nil { return errors.Trace(errNotLeader) } - return errors.Trace(errResp) + return errors.Wrap(errResp, respErr.String()) } return nil }