This repository has been archived by the owner on Jul 24, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 102
restore: speed up retry on not leader #179
Merged
Merged
Changes from 6 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
4a36112
tests: stable cluster start up
overvenus ccda3f2
tests: fix unbound var
overvenus 0bf1894
restore: speed retry on not leader
overvenus 96a6517
address comments
overvenus 663a2d5
tests: add --cacert flag
overvenus d37ba4c
make codecov green
overvenus 60eaa3b
address comments
overvenus File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
codecov: | ||
require_ci_to_pass: yes | ||
|
||
coverage: | ||
status: | ||
project: | ||
default: | ||
# Allow the coverage to drop by 3% | ||
threshold: 3% | ||
patch: off |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -171,23 +171,23 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul | |||||
ctx, cancel := context.WithTimeout(importer.ctx, importScanRegionTime) | ||||||
defer cancel() | ||||||
// Scan regions covered by the file range | ||||||
regionInfos, err1 := importer.metaClient.ScanRegions(ctx, startKey, endKey, 0) | ||||||
if err1 != nil { | ||||||
return errors.Trace(err1) | ||||||
regionInfos, errScanRegion := importer.metaClient.ScanRegions(ctx, startKey, endKey, 0) | ||||||
if errScanRegion != nil { | ||||||
return errors.Trace(errScanRegion) | ||||||
} | ||||||
log.Debug("scan regions", zap.Stringer("file", file), zap.Int("count", len(regionInfos))) | ||||||
// Try to download and ingest the file in every region | ||||||
for _, regionInfo := range regionInfos { | ||||||
info := regionInfo | ||||||
// Try to download file. | ||||||
var downloadMeta *import_sstpb.SSTMeta | ||||||
err1 = utils.WithRetry(importer.ctx, func() error { | ||||||
errDownload := utils.WithRetry(importer.ctx, func() error { | ||||||
var e error | ||||||
downloadMeta, e = importer.downloadSST(info, file, rewriteRules) | ||||||
return e | ||||||
}, newDownloadSSTBackoffer()) | ||||||
if err1 != nil { | ||||||
if err1 == errRewriteRuleNotFound || err1 == errRangeIsEmpty { | ||||||
if errDownload != nil { | ||||||
if errDownload == errRewriteRuleNotFound || errDownload == errRangeIsEmpty { | ||||||
// Skip this region | ||||||
continue | ||||||
} | ||||||
|
@@ -196,32 +196,68 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul | |||||
zap.Stringer("region", info.Region), | ||||||
zap.Binary("startKey", startKey), | ||||||
zap.Binary("endKey", endKey), | ||||||
zap.Error(err1)) | ||||||
return err1 | ||||||
zap.Error(errDownload)) | ||||||
return errDownload | ||||||
} | ||||||
err1 = importer.ingestSST(downloadMeta, info) | ||||||
// If error is `NotLeader`, update the region info and retry | ||||||
for errors.Cause(err1) == errNotLeader { | ||||||
log.Debug("ingest sst returns not leader error, retry it", | ||||||
zap.Stringer("region", info.Region)) | ||||||
var newInfo *RegionInfo | ||||||
newInfo, err1 = importer.metaClient.GetRegion(importer.ctx, info.Region.GetStartKey()) | ||||||
if err1 != nil { | ||||||
break | ||||||
ingestResp, errIngest := importer.ingestSST(downloadMeta, info) | ||||||
ingestRetry: | ||||||
for errIngest == nil { | ||||||
errPb := ingestResp.GetError() | ||||||
if errPb == nil { | ||||||
// Ingest success | ||||||
break ingestRetry | ||||||
} | ||||||
if !checkRegionEpoch(newInfo, info) { | ||||||
err1 = errEpochNotMatch | ||||||
break | ||||||
var newInfo *RegionInfo | ||||||
switch { | ||||||
case errPb.NotLeader != nil: | ||||||
// If error is `NotLeader`, update the region info and retry | ||||||
if newLeader := errPb.GetNotLeader().GetLeader(); newLeader != nil { | ||||||
newInfo = &RegionInfo{ | ||||||
Leader: newLeader, | ||||||
Region: info.Region, | ||||||
} | ||||||
} else { | ||||||
// Slow path, get region from PD | ||||||
newInfo, errIngest = importer.metaClient.GetRegion( | ||||||
importer.ctx, info.Region.GetStartKey()) | ||||||
if errIngest != nil { | ||||||
break ingestRetry | ||||||
} | ||||||
} | ||||||
log.Debug("ingest sst returns not leader error, retry it", | ||||||
zap.Stringer("region", info.Region), | ||||||
zap.Stringer("newLeader", newInfo.Leader)) | ||||||
|
||||||
if !checkRegionEpoch(newInfo, info) { | ||||||
errIngest = errEpochNotMatch | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
break ingestRetry | ||||||
} | ||||||
ingestResp, errIngest = importer.ingestSST(downloadMeta, newInfo) | ||||||
case errPb.EpochNotMatch != nil: | ||||||
// TODO handle epoch not match error | ||||||
// 1. retry download if needed | ||||||
// 2. retry ingest | ||||||
errIngest = errEpochNotMatch | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
break ingestRetry | ||||||
case errPb.RegionNotFound != nil: | ||||||
errIngest = errRegionNotFound | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
break ingestRetry | ||||||
case errPb.KeyNotInRegion != nil: | ||||||
errIngest = errKeyNotInRegion | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
break ingestRetry | ||||||
default: | ||||||
errIngest = errors.Errorf("ingest error %s", errPb) | ||||||
break ingestRetry | ||||||
} | ||||||
err1 = importer.ingestSST(downloadMeta, newInfo) | ||||||
} | ||||||
if err1 != nil { | ||||||
|
||||||
if errIngest != nil { | ||||||
log.Error("ingest file failed", | ||||||
zap.Stringer("file", file), | ||||||
zap.Stringer("range", downloadMeta.GetRange()), | ||||||
zap.Stringer("region", info.Region), | ||||||
zap.Error(err1)) | ||||||
return err1 | ||||||
zap.Error(errIngest)) | ||||||
return errIngest | ||||||
} | ||||||
summary.CollectSuccessUnit(summary.TotalKV, file.TotalKvs) | ||||||
summary.CollectSuccessUnit(summary.TotalBytes, file.TotalBytes) | ||||||
|
@@ -290,7 +326,7 @@ func (importer *FileImporter) downloadSST( | |||||
func (importer *FileImporter) ingestSST( | ||||||
sstMeta *import_sstpb.SSTMeta, | ||||||
regionInfo *RegionInfo, | ||||||
) error { | ||||||
) (*import_sstpb.IngestResponse, error) { | ||||||
leader := regionInfo.Leader | ||||||
if leader == nil { | ||||||
leader = regionInfo.Region.GetPeers()[0] | ||||||
|
@@ -304,26 +340,13 @@ func (importer *FileImporter) ingestSST( | |||||
Context: reqCtx, | ||||||
Sst: sstMeta, | ||||||
} | ||||||
log.Debug("download SST", zap.Stringer("sstMeta", sstMeta)) | ||||||
log.Debug("ingest SST", zap.Stringer("sstMeta", sstMeta), zap.Reflect("leader", leader)) | ||||||
resp, err := importer.importClient.IngestSST(importer.ctx, leader.GetStoreId(), req) | ||||||
if err != nil { | ||||||
if strings.Contains(err.Error(), "RegionNotFound") { | ||||||
return errors.Trace(errRegionNotFound) | ||||||
} | ||||||
return errors.Trace(err) | ||||||
} | ||||||
respErr := resp.GetError() | ||||||
if respErr != nil { | ||||||
log.Debug("ingest sst resp error", zap.Stringer("error", respErr)) | ||||||
if respErr.GetKeyNotInRegion() != nil { | ||||||
return errors.Trace(errKeyNotInRegion) | ||||||
} | ||||||
if respErr.GetNotLeader() != nil { | ||||||
return errors.Trace(errNotLeader) | ||||||
} | ||||||
return errors.Wrap(errResp, respErr.String()) | ||||||
log.Error("ingest sst error", zap.Error(err)) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you log the error here there will be two error logs (another is "ingest file failed"). Maybe redundant. |
||||||
return nil, errors.Trace(err) | ||||||
} | ||||||
return nil | ||||||
return resp, nil | ||||||
} | ||||||
|
||||||
func checkRegionEpoch(new, old *RegionInfo) bool { | ||||||
|
@@ -347,5 +370,5 @@ func extractDownloadSSTError(e error) error { | |||||
case strings.Contains(e.Error(), "Cannot read"): | ||||||
err = errCannotRead | ||||||
} | ||||||
return errors.Trace(err) | ||||||
return errors.Annotatef(err, "%s", e) | ||||||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the
var newInfo *RegionInfo
inside thecase errPb.NotLeader != nil
please.