Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
backup: allow backup tolerate minor TiKV failure (#997) (#1019)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot committed Apr 15, 2021
1 parent 6ec2592 commit 264059b
Show file tree
Hide file tree
Showing 9 changed files with 311 additions and 61 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,12 @@ static: prepare tools
$(PACKAGE_DIRECTORIES)
# pingcap/errors APIs are mixed with multiple patterns 'pkg/errors',
# 'juju/errors' and 'pingcap/parser'. To avoid confusion and mistake,
# we only allow a subset of APIs, that's "Normalize|Annotate|Trace|Cause".
# we only allow a subset of APIs, that's "Normalize|Annotate|Trace|Cause|Find".
# TODO: check lightning packages.
@# TODO: allow more APIs when we need to support "workaound".
grep -Rn --include="*.go" --exclude="*_test.go" -E "(\t| )errors\.[A-Z]" \
$$($(PACKAGE_DIRECTORIES) | grep -vE "tests|lightning") | \
grep -vE "Normalize|Annotate|Trace|Cause|RedactLogEnabled" 2>&1 | $(CHECKER)
grep -vE "Normalize|Annotate|Trace|Cause|RedactLogEnabled|Find" 2>&1 | $(CHECKER)
# The package name of "github.com/pingcap/kvproto/pkg/backup" collides
# "github.com/pingcap/br/pkg/backup", so we rename kvproto to backuppb.
grep -Rn --include="*.go" -E '"github.com/pingcap/kvproto/pkg/backup"' \
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ error = '''
backup no leader
'''

["BR:Common:ErrFailedToConnect"]
error = '''
failed to make gRPC channels
'''

["BR:Common:ErrInvalidArgument"]
error = '''
invalid argument
Expand Down
56 changes: 51 additions & 5 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (bc *Client) SaveBackupMeta(ctx context.Context, backupMeta *backuppb.Backu
if sigFile, ok := v.(string); ok {
file, err := os.Create(sigFile)
if err != nil {
log.Warn("failed to find shell to notify, skipping notify", zap.Error(err))
log.Warn("failed to create file for notifying, skipping notify", zap.Error(err))
}
if file != nil {
file.Close()
Expand Down Expand Up @@ -613,6 +613,21 @@ func (bc *Client) fineGrainedBackup(
ctx = opentracing.ContextWithSpan(ctx, span1)
}

failpoint.Inject("hint-fine-grained-backup", func(v failpoint.Value) {
log.Info("failpoint hint-fine-grained-backup injected, "+
"process will sleep for 3s and notify the shell.", zap.String("file", v.(string)))
if sigFile, ok := v.(string); ok {
file, err := os.Create(sigFile)
if err != nil {
log.Warn("failed to create file for notifying, skipping notify", zap.Error(err))
}
if file != nil {
file.Close()
}
time.Sleep(3 * time.Second)
}
})

bo := tikv.NewBackoffer(ctx, backupFineGrainedMaxBackoff)
for {
// Step1, check whether there is any incomplete range
Expand Down Expand Up @@ -809,8 +824,15 @@ func (bc *Client) handleFineGrained(
lockResolver := bc.mgr.GetLockResolver()
client, err := bc.mgr.GetBackupClient(ctx, storeID)
if err != nil {
if berrors.Is(err, berrors.ErrFailedToConnect) {
// When the leader store is died,
// 20s for the default max duration before the raft election timer fires.
log.Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID))
return 20000, nil
}

log.Error("fail to connect store", zap.Uint64("StoreID", storeID))
return 0, errors.Trace(err)
return 0, errors.Annotatef(err, "failed to connect to store %d", storeID)
}
err = SendBackup(
ctx, storeID, client, req,
Expand All @@ -834,7 +856,15 @@ func (bc *Client) handleFineGrained(
return bc.mgr.ResetBackupClient(ctx, storeID)
})
if err != nil {
return 0, errors.Trace(err)
if berrors.Is(err, berrors.ErrFailedToConnect) {
// When the leader store is died,
// 20s for the default max duration before the raft election timer fires.
log.Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID))
return 20000, nil
}
log.Error("failed to send fine-grained backup", zap.Uint64("storeID", storeID), logutil.ShortError(err))
return 0, errors.Annotatef(err, "failed to send fine-grained backup [%s, %s)",
redact.Key(req.StartKey), redact.Key(req.EndKey))
}
return max, nil
}
Expand Down Expand Up @@ -867,6 +897,20 @@ backupLoop:
zap.Uint64("storeID", storeID),
zap.Int("retry time", retry),
)
failpoint.Inject("hint-backup-start", func(v failpoint.Value) {
log.Info("failpoint hint-backup-start injected, " +
"process will notify the shell.")
if sigFile, ok := v.(string); ok {
file, err := os.Create(sigFile)
if err != nil {
log.Warn("failed to create file for notifying, skipping notify", zap.Error(err))
}
if file != nil {
file.Close()
}
}
time.Sleep(3 * time.Second)
})
bcli, err := client.Backup(ctx, &req)
failpoint.Inject("reset-retryable-error", func(val failpoint.Value) {
if val.(bool) {
Expand All @@ -892,8 +936,9 @@ backupLoop:
}
log.Error("fail to backup", zap.Uint64("StoreID", storeID),
zap.Int("retry time", retry))
return errors.Trace(err)
return berrors.ErrFailedToConnect.Wrap(err).GenWithStack("failed to create backup stream to store %d", storeID)
}
defer bcli.CloseSend()

for {
resp, err := bcli.Recv()
Expand All @@ -914,8 +959,9 @@ backupLoop:
}
break
}
return errors.Annotatef(err, "failed to connect to store: %d with retry times:%d", storeID, retry)
return berrors.ErrFailedToConnect.Wrap(err).GenWithStack("failed to connect to store: %d with retry times:%d", storeID, retry)
}

// TODO: handle errors in the resp.
log.Info("range backuped",
logutil.Key("startKey", resp.GetStartKey()),
Expand Down
20 changes: 17 additions & 3 deletions pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (

berrors "github.com/pingcap/br/pkg/errors"
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/logutil"
"github.com/pingcap/br/pkg/redact"
"github.com/pingcap/br/pkg/rtree"
"github.com/pingcap/br/pkg/utils"
)
Expand Down Expand Up @@ -51,6 +53,11 @@ func (push *pushDown) pushBackup(

// Push down backup tasks to all tikv instances.
res := rtree.NewRangeTree()
failpoint.Inject("noop-backup", func(_ failpoint.Value) {
log.Warn("skipping normal backup, jump to fine-grained backup, meow :3", logutil.Key("start-key", req.StartKey), logutil.Key("end-key", req.EndKey))
failpoint.Return(res, nil)
})

wg := new(sync.WaitGroup)
for _, s := range stores {
storeID := s.GetId()
Expand All @@ -60,8 +67,10 @@ func (push *pushDown) pushBackup(
}
client, err := push.mgr.GetBackupClient(ctx, storeID)
if err != nil {
log.Error("fail to connect store", zap.Uint64("StoreID", storeID))
return res, errors.Trace(err)
// BR should be able to backup even some of stores disconnected.
// The regions managed by this store can be retried at fine-grained backup then.
log.Warn("fail to connect store, skipping", zap.Uint64("StoreID", storeID), zap.Error(err))
return res, nil
}
wg.Add(1)
go func() {
Expand All @@ -77,6 +86,7 @@ func (push *pushDown) pushBackup(
log.Warn("reset the connection in push", zap.Uint64("storeID", storeID))
return push.mgr.ResetBackupClient(ctx, storeID)
})
// Disconnected stores can be ignored.
if err != nil {
push.errCh <- err
return
Expand Down Expand Up @@ -133,7 +143,11 @@ func (push *pushDown) pushBackup(
}
}
case err := <-push.errCh:
return res, errors.Trace(err)
if !berrors.Is(err, berrors.ErrFailedToConnect) {
return res, errors.Annotatef(err, "failed to backup range [%s, %s)", redact.Key(req.StartKey), redact.Key(req.EndKey))
}
log.Warn("skipping disconnected stores", logutil.ShortError(err))
return res, nil
}
}
}
19 changes: 18 additions & 1 deletion pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ package conn
import (
"context"
"crypto/tls"
"os"
"sync"
"sync/atomic"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -245,6 +247,20 @@ func NewMgr(
}

func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) {
failpoint.Inject("hint-get-backup-client", func(v failpoint.Value) {
log.Info("failpoint hint-get-backup-client injected, "+
"process will notify the shell.", zap.Uint64("store", storeID))
if sigFile, ok := v.(string); ok {
file, err := os.Create(sigFile)
if err != nil {
log.Warn("failed to create file for notifying, skipping notify", zap.Error(err))
}
if file != nil {
file.Close()
}
}
time.Sleep(3 * time.Second)
})
store, err := mgr.GetPDClient().GetStore(ctx, storeID)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -270,7 +286,7 @@ func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.Cl
)
cancel()
if err != nil {
return nil, errors.Trace(err)
return nil, berrors.ErrFailedToConnect.Wrap(err).GenWithStack("failed to make connection to store %d", storeID)
}
return conn, nil
}
Expand All @@ -283,6 +299,7 @@ func (mgr *Mgr) GetBackupClient(ctx context.Context, storeID uint64) (backuppb.B

mgr.grpcClis.mu.Lock()
defer mgr.grpcClis.mu.Unlock()

if conn, ok := mgr.grpcClis.clis[storeID]; ok {
// Find a cached backup client.
return backuppb.NewBackupClient(conn), nil
Expand Down
10 changes: 10 additions & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,21 @@ import (
"github.com/pingcap/errors"
)

// Is tests whether the specificated error causes the error `err`.
func Is(err error, is *errors.Error) bool {
errorFound := errors.Find(err, func(e error) bool {
normalizedErr, ok := e.(*errors.Error)
return ok && normalizedErr.ID() == is.ID()
})
return errorFound != nil
}

// BR errors.
var (
ErrUnknown = errors.Normalize("internal error", errors.RFCCodeText("BR:Common:ErrUnknown"))
ErrInvalidArgument = errors.Normalize("invalid argument", errors.RFCCodeText("BR:Common:ErrInvalidArgument"))
ErrVersionMismatch = errors.Normalize("version mismatch", errors.RFCCodeText("BR:Common:ErrVersionMismatch"))
ErrFailedToConnect = errors.Normalize("failed to make gRPC channels", errors.RFCCodeText("BR:Common:ErrFailedToConnect"))

ErrPDUpdateFailed = errors.Normalize("failed to update PD", errors.RFCCodeText("BR:PD:ErrPDUpdateFailed"))
ErrPDLeaderNotFound = errors.Normalize("PD leader not found", errors.RFCCodeText("BR:PD:ErrPDLeaderNotFound"))
Expand Down
Loading

0 comments on commit 264059b

Please sign in to comment.