diff --git a/Makefile b/Makefile index e8cffc534..088212449 100644 --- a/Makefile +++ b/Makefile @@ -215,12 +215,12 @@ static: prepare tools $(PACKAGE_DIRECTORIES) # pingcap/errors APIs are mixed with multiple patterns 'pkg/errors', # 'juju/errors' and 'pingcap/parser'. To avoid confusion and mistake, - # we only allow a subset of APIs, that's "Normalize|Annotate|Trace|Cause". + # we only allow a subset of APIs, that's "Normalize|Annotate|Trace|Cause|Find". # TODO: check lightning packages. @# TODO: allow more APIs when we need to support "workaound". grep -Rn --include="*.go" --exclude="*_test.go" -E "(\t| )errors\.[A-Z]" \ $$($(PACKAGE_DIRECTORIES) | grep -vE "tests|lightning") | \ - grep -vE "Normalize|Annotate|Trace|Cause|RedactLogEnabled" 2>&1 | $(CHECKER) + grep -vE "Normalize|Annotate|Trace|Cause|RedactLogEnabled|Find" 2>&1 | $(CHECKER) # The package name of "github.com/pingcap/kvproto/pkg/backup" collides # "github.com/pingcap/br/pkg/backup", so we rename kvproto to backuppb. grep -Rn --include="*.go" -E '"github.com/pingcap/kvproto/pkg/backup"' \ diff --git a/errors.toml b/errors.toml index 07c595dfe..c8b935085 100644 --- a/errors.toml +++ b/errors.toml @@ -21,6 +21,11 @@ error = ''' backup no leader ''' +["BR:Common:ErrFailedToConnect"] +error = ''' +failed to make gRPC channels +''' + ["BR:Common:ErrInvalidArgument"] error = ''' invalid argument diff --git a/pkg/backup/client.go b/pkg/backup/client.go index 9a78ec450..7fe4d0310 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -38,6 +38,7 @@ import ( "github.com/pingcap/br/pkg/conn" berrors "github.com/pingcap/br/pkg/errors" "github.com/pingcap/br/pkg/logutil" + "github.com/pingcap/br/pkg/redact" "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/storage" "github.com/pingcap/br/pkg/summary" @@ -221,7 +222,7 @@ func (bc *Client) SaveBackupMeta(ctx context.Context, backupMeta *backuppb.Backu if sigFile, ok := v.(string); ok { file, err := os.Create(sigFile) if err != nil { - log.Warn("failed to find shell to notify, skipping notify", zap.Error(err)) + log.Warn("failed to create file for notifying, skipping notify", zap.Error(err)) } if file != nil { file.Close() @@ -609,6 +610,21 @@ func (bc *Client) fineGrainedBackup( rangeTree rtree.RangeTree, progressCallBack func(ProgressUnit), ) error { + failpoint.Inject("hint-fine-grained-backup", func(v failpoint.Value) { + log.Info("failpoint hint-fine-grained-backup injected, "+ + "process will sleep for 3s and notify the shell.", zap.String("file", v.(string))) + if sigFile, ok := v.(string); ok { + file, err := os.Create(sigFile) + if err != nil { + log.Warn("failed to create file for notifying, skipping notify", zap.Error(err)) + } + if file != nil { + file.Close() + } + time.Sleep(3 * time.Second) + } + }) + bo := tikv.NewBackoffer(ctx, backupFineGrainedMaxBackoff) for { // Step1, check whether there is any incomplete range @@ -804,8 +820,15 @@ func (bc *Client) handleFineGrained( lockResolver := bc.mgr.GetLockResolver() client, err := bc.mgr.GetBackupClient(ctx, storeID) if err != nil { + if berrors.Is(err, berrors.ErrFailedToConnect) { + // When the leader store is died, + // 20s for the default max duration before the raft election timer fires. + log.Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID)) + return 20000, nil + } + log.Error("fail to connect store", zap.Uint64("StoreID", storeID)) - return 0, errors.Trace(err) + return 0, errors.Annotatef(err, "failed to connect to store %d", storeID) } hasProgress := false backoffMill := 0 @@ -835,7 +858,15 @@ func (bc *Client) handleFineGrained( return bc.mgr.ResetBackupClient(ctx, storeID) }) if err != nil { - return 0, errors.Trace(err) + if berrors.Is(err, berrors.ErrFailedToConnect) { + // When the leader store is died, + // 20s for the default max duration before the raft election timer fires. + log.Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID)) + return 20000, nil + } + log.Error("failed to send fine-grained backup", zap.Uint64("storeID", storeID), logutil.ShortError(err)) + return 0, errors.Annotatef(err, "failed to send fine-grained backup [%s, %s)", + redact.Key(req.StartKey), redact.Key(req.EndKey)) } // If no progress, backoff 10s for debouncing. @@ -866,6 +897,20 @@ backupLoop: zap.Uint64("storeID", storeID), zap.Int("retry time", retry), ) + failpoint.Inject("hint-backup-start", func(v failpoint.Value) { + log.Info("failpoint hint-backup-start injected, " + + "process will notify the shell.") + if sigFile, ok := v.(string); ok { + file, err := os.Create(sigFile) + if err != nil { + log.Warn("failed to create file for notifying, skipping notify", zap.Error(err)) + } + if file != nil { + file.Close() + } + } + time.Sleep(3 * time.Second) + }) bcli, err := client.Backup(ctx, &req) failpoint.Inject("reset-retryable-error", func(val failpoint.Value) { if val.(bool) { @@ -891,8 +936,11 @@ backupLoop: } log.Error("fail to backup", zap.Uint64("StoreID", storeID), zap.Int("retry time", retry)) - return errors.Trace(err) + return berrors.ErrFailedToConnect.Wrap(err).GenWithStack("failed to create backup stream to store %d", storeID) } + // It's strange this can pass errcheck in both release-5.0 and master + // nolint:errcheck + defer bcli.CloseSend() for { resp, err := bcli.Recv() @@ -913,8 +961,9 @@ backupLoop: } break } - return errors.Annotatef(err, "failed to connect to store: %d with retry times:%d", storeID, retry) + return berrors.ErrFailedToConnect.Wrap(err).GenWithStack("failed to connect to store: %d with retry times:%d", storeID, retry) } + // TODO: handle errors in the resp. log.Info("range backuped", logutil.Key("startKey", resp.GetStartKey()), diff --git a/pkg/backup/push.go b/pkg/backup/push.go index 13a9d054e..9aeae4145 100644 --- a/pkg/backup/push.go +++ b/pkg/backup/push.go @@ -14,6 +14,8 @@ import ( "go.uber.org/zap" berrors "github.com/pingcap/br/pkg/errors" + "github.com/pingcap/br/pkg/logutil" + "github.com/pingcap/br/pkg/redact" "github.com/pingcap/br/pkg/rtree" "github.com/pingcap/br/pkg/utils" ) @@ -43,6 +45,11 @@ func (push *pushDown) pushBackup( ) (rtree.RangeTree, error) { // Push down backup tasks to all tikv instances. res := rtree.NewRangeTree() + failpoint.Inject("noop-backup", func(_ failpoint.Value) { + log.Warn("skipping normal backup, jump to fine-grained backup, meow :3", logutil.Key("start-key", req.StartKey), logutil.Key("end-key", req.EndKey)) + failpoint.Return(res, nil) + }) + wg := new(sync.WaitGroup) for _, s := range stores { storeID := s.GetId() @@ -52,8 +59,10 @@ func (push *pushDown) pushBackup( } client, err := push.mgr.GetBackupClient(ctx, storeID) if err != nil { - log.Error("fail to connect store", zap.Uint64("StoreID", storeID)) - return res, errors.Trace(err) + // BR should be able to backup even some of stores disconnected. + // The regions managed by this store can be retried at fine-grained backup then. + log.Warn("fail to connect store, skipping", zap.Uint64("StoreID", storeID), zap.Error(err)) + return res, nil } wg.Add(1) go func() { @@ -69,6 +78,7 @@ func (push *pushDown) pushBackup( log.Warn("reset the connection in push", zap.Uint64("storeID", storeID)) return push.mgr.ResetBackupClient(ctx, storeID) }) + // Disconnected stores can be ignored. if err != nil { push.errCh <- err return @@ -125,7 +135,11 @@ func (push *pushDown) pushBackup( } } case err := <-push.errCh: - return res, errors.Trace(err) + if !berrors.Is(err, berrors.ErrFailedToConnect) { + return res, errors.Annotatef(err, "failed to backup range [%s, %s)", redact.Key(req.StartKey), redact.Key(req.EndKey)) + } + log.Warn("skipping disconnected stores", logutil.ShortError(err)) + return res, nil } } } diff --git a/pkg/conn/conn.go b/pkg/conn/conn.go index ab01d771e..ae1de0829 100644 --- a/pkg/conn/conn.go +++ b/pkg/conn/conn.go @@ -5,11 +5,13 @@ package conn import ( "context" "crypto/tls" + "os" "sync" "sync/atomic" "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" @@ -171,6 +173,20 @@ func NewMgr( } func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) { + failpoint.Inject("hint-get-backup-client", func(v failpoint.Value) { + log.Info("failpoint hint-get-backup-client injected, "+ + "process will notify the shell.", zap.Uint64("store", storeID)) + if sigFile, ok := v.(string); ok { + file, err := os.Create(sigFile) + if err != nil { + log.Warn("failed to create file for notifying, skipping notify", zap.Error(err)) + } + if file != nil { + file.Close() + } + } + time.Sleep(3 * time.Second) + }) store, err := mgr.GetPDClient().GetStore(ctx, storeID) if err != nil { return nil, errors.Trace(err) @@ -196,7 +212,7 @@ func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.Cl ) cancel() if err != nil { - return nil, errors.Trace(err) + return nil, berrors.ErrFailedToConnect.Wrap(err).GenWithStack("failed to make connection to store %d", storeID) } return conn, nil } @@ -209,6 +225,7 @@ func (mgr *Mgr) GetBackupClient(ctx context.Context, storeID uint64) (backuppb.B mgr.grpcClis.mu.Lock() defer mgr.grpcClis.mu.Unlock() + if conn, ok := mgr.grpcClis.clis[storeID]; ok { // Find a cached backup client. return backuppb.NewBackupClient(conn), nil diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index f215c280d..c74346c28 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -6,11 +6,22 @@ import ( "github.com/pingcap/errors" ) +// Is tests whether the specificated error causes the error `err`. +func Is(err error, is *errors.Error) bool { + errorFound := errors.Find(err, func(e error) bool { + //nolint:errorlint + normalizedErr, ok := e.(*errors.Error) + return ok && normalizedErr.ID() == is.ID() + }) + return errorFound != nil +} + // BR errors. var ( ErrUnknown = errors.Normalize("internal error", errors.RFCCodeText("BR:Common:ErrUnknown")) ErrInvalidArgument = errors.Normalize("invalid argument", errors.RFCCodeText("BR:Common:ErrInvalidArgument")) ErrVersionMismatch = errors.Normalize("version mismatch", errors.RFCCodeText("BR:Common:ErrVersionMismatch")) + ErrFailedToConnect = errors.Normalize("failed to make gRPC channels", errors.RFCCodeText("BR:Common:ErrFailedToConnect")) ErrPDUpdateFailed = errors.Normalize("failed to update PD", errors.RFCCodeText("BR:PD:ErrPDUpdateFailed")) ErrPDLeaderNotFound = errors.Normalize("PD leader not found", errors.RFCCodeText("BR:PD:ErrPDLeaderNotFound")) diff --git a/tests/_utils/run_services b/tests/_utils/run_services index 17350eb0e..ad39b2f77 100644 --- a/tests/_utils/run_services +++ b/tests/_utils/run_services @@ -28,6 +28,7 @@ export TIKV_COUNT=3 export TIFLASH_STATUS="127.0.0.1:17000" export TIFLASH_HTTP="127.0.0.1:8125" export IMPORTER_ADDR="127.0.0.1:8808" +export TIKV_PIDS="${TEST_DIR:?}/tikv_pids.txt" cleanup_data() { # Clean up data @@ -36,18 +37,26 @@ cleanup_data() { done } +stop() { + svc=$1 + killall -v -1 "$svc" 2>/dev/null || return 0 + sleep 1 # give some grace shutdown period + killall -v -9 "$svc" &>/dev/null || return 0 +} + stop_services() { for svc in "br" "tidb-server" "tiflash" "TiFlashMain" "tikv-server" "pd-server" "cdc" "minio" "tikv-importer"; do - killall -v -1 $svc 2>/dev/null || continue - sleep 1 # give some grace shutdown period - killall -v -9 $svc &>/dev/null || continue + stop $svc & done + sleep 2 # give some time for the OS to reap all processes + lsof -n -P -i :2379 -i :4000 -i :10080 -i :20161 -i :20162 -i :20163 -i :20181 -i :20182 -i :20183 -i :17000 -i :8125 || true } start_services() { max_retry=3 for retry_time in $(seq 1 $max_retry); do - if start_services_impl $@; then + # run it in a subshell so the failure won't stop execution. + if ( start_services_impl "$@" ); then return 0 fi stop_services @@ -58,36 +67,7 @@ start_services() { return 1 } -start_services_impl() { - stop_services || true - cleanup_data || true - - TIDB_CONFIG="tests/config/tidb.toml" - TIKV_CONFIG="tests/config/tikv.toml" - PD_CONFIG="tests/config/pd.toml" - RUN_TIFLASH="YES" - - while [[ $# -gt 0 ]] - do - local key="$1" - - case $key in - --tidb-cfg) - TIDB_CONFIG="$2" - shift # past argument - shift # past value - ;; - --no-tiflash) - RUN_TIFLASH="NO" - shift # past argument - ;; - *) # unknown option - echo "Unknown args $@" - exit 1 - ;; - esac - done - +start_pd() { echo "Starting PD..." mkdir -p "$TEST_DIR/pd" bin/pd-server \ @@ -106,20 +86,62 @@ start_services_impl() { fi sleep 3 done +} - echo "Starting TiKV..." - for i in $(seq $TIKV_COUNT); do - mkdir -p "$TEST_DIR/tikv${i}" - bin/tikv-server \ - --pd "$PD_ADDR" \ - -A "$TIKV_ADDR$i" \ - --status-addr "$TIKV_STATUS_ADDR$i" \ - --log-file "$TEST_DIR/tikv${i}.log" \ - --log-level info \ - -C "$TIKV_CONFIG" \ - -s "$TEST_DIR/tikv${i}" & +kv_outage() { + dur="" + id=() + scale_in=false + scale_out=false + until [ $# -eq 0 ]; do + case $1 in + --duration | -d) shift; dur=$1 ;; + --id | -i) shift; id+=("$1") ;; + --scale-out) scale_out=true ;; + --kill) scale_in=true ;; + esac + shift done + $scale_out || { + for i in "${id[@]}"; do + target=$(cat "${TIKV_PIDS}_$i" | awk '{print $1}') + echo "killing TiKV $target(#$i)" + kill "$target" || true + sleep 1 + kill -9 "$target" || true + done + } + $scale_in || $scale_out || sleep "$dur" + $scale_in || { + for i in "${id[@]}"; do + if [ -e "${TIKV_PIDS}_$i" ]; then + TIKV_CONFIG=$(cat "${TIKV_PIDS}_$i" | awk '{print $2}') + else + TIKV_CONFIG=${TIKV_CONFIG:-"tests/config/tikv.toml"} + fi + start_tikv "$i" + done + } +} + +start_tikv() { + i=$1 + echo "Starting TiKV($i)..." + mkdir -p "$TEST_DIR/tikv${i}" + bin/tikv-server \ + --pd "$PD_ADDR" \ + -A "$TIKV_ADDR$i" \ + --status-addr "$TIKV_STATUS_ADDR$i" \ + --log-file "$TEST_DIR/tikv${i}.log" \ + --log-level info \ + -C "$TIKV_CONFIG" \ + -s "$TEST_DIR/tikv${i}" & + pid=$! + echo -e "$pid\t$TIKV_CONFIG" > "${TIKV_PIDS}_${i}" +} + +ensure_tikv() { echo "Waiting initializing TiKV..." while ! run_curl "https://$PD_ADDR/pd/api/v1/cluster/status" | grep '"is_initialized": true'; do i=$((i+1)) @@ -129,7 +151,9 @@ start_services_impl() { fi sleep 5 done +} +start_tidb() { echo "Starting TiDB..." bin/tidb-server \ -P 4000 \ @@ -150,18 +174,60 @@ start_services_impl() { fi sleep 3 done +} +start_importer() { echo "Starting Importer..." bin/tikv-importer \ --addr "$IMPORTER_ADDR" \ --import-dir "$TEST_DIR/importer" \ --log-file "$TEST_DIR/importer.log" \ --config "tests/config/importer.toml" & +} - if [[ $RUN_TIFLASH == "YES" ]]; then - if ! start_tiflash; then - return 1 - fi + +start_services_impl() { + stop_services || true + cleanup_data || true + + TIDB_CONFIG="tests/config/tidb.toml" + TIKV_CONFIG="tests/config/tikv.toml" + PD_CONFIG="tests/config/pd.toml" + RUN_TIFLASH=true + + while [[ $# -gt 0 ]] + do + local key="$1" + + case $key in + --tidb-cfg) + TIDB_CONFIG="$2" + shift # past argument + shift # past value + ;; + --no-tiflash) + RUN_TIFLASH=false + shift # past argument + ;; + *) # unknown option + echo "Unknown args $1" + exit 1 + ;; + esac + done + + rm -f "${TIKV_PIDS}*" + + start_pd + for i in $(seq $TIKV_COUNT); do + start_tikv "$i" + done + ensure_tikv + start_tidb + start_importer + + if $RUN_TIFLASH; then + start_tiflash fi i=0 diff --git a/tests/br_serv_outage/run.sh b/tests/br_serv_outage/run.sh new file mode 100644 index 000000000..2d4a30913 --- /dev/null +++ b/tests/br_serv_outage/run.sh @@ -0,0 +1,82 @@ +#! /bin/bash + +set -eux + +. run_services + +wait_file_exist() { + until [ -e "$1" ]; do + sleep 1 + done +} + +single_point_fault() { + type=$1 + victim=$(shuf -i 1-3 -n 1) + echo "Will make failure($type) to store#$victim." + case $type in + outage) + wait_file_exist "$hint_backup_start" + kv_outage -d 30 -i $victim;; + outage-after-request) + wait_file_exist "$hint_get_backup_client" + kv_outage -d 30 -i $victim;; + outage-at-finegrained) + wait_file_exist "$hint_finegrained" + kv_outage --kill -i $victim;; + shutdown) + wait_file_exist "$hint_backup_start" + kv_outage --kill -i $victim;; + scale-out) + wait_file_exist "$hint_backup_start" + kv_outage --kill -i $victim + kv_outage --scale-out -i 4;; + esac +} + +load() { + run_sql "create database if not exists $TEST_NAME" + go-ycsb load mysql -P tests/"$TEST_NAME"/workload -p mysql.host="$TIDB_IP" -p mysql.port="$TIDB_PORT" -p mysql.user=root -p mysql.db="$TEST_NAME" + run_sql 'use '$TEST_NAME'; show tables' +} + +check() { + run_sql 'drop database if exists '$TEST_NAME';' + run_br restore full -s local://"$backup_dir" + count=$(run_sql 'select count(*) from '$TEST_NAME'.usertable;' | tail -n 1 | awk '{print $2}') + [ "$count" -eq 20000 ] +} + +load + +hint_finegrained=$TEST_DIR/hint_finegrained +hint_backup_start=$TEST_DIR/hint_backup_start +hint_get_backup_client=$TEST_DIR/hint_get_backup_client + + +cases=${cases:-'outage outage-after-request outage-at-finegrained shutdown scale-out'} + +for failure in $cases; do + rm -f "$hint_finegrained" "$hint_backup_start" "$hint_get_backup_client" + export GO_FAILPOINTS="github.com/pingcap/br/pkg/backup/hint-backup-start=1*return(\"$hint_backup_start\");\ +github.com/pingcap/br/pkg/backup/hint-fine-grained-backup=1*return(\"$hint_finegrained\");\ +github.com/pingcap/br/pkg/conn/hint-get-backup-client=1*return(\"$hint_get_backup_client\")" + if [ "$failure" = outage-at-finegrained ]; then + export GO_FAILPOINTS="$GO_FAILPOINTS;github.com/pingcap/br/pkg/backup/noop-backup=return(true)" + fi + + backup_dir=${TEST_DIR:?}/"backup{test:${TEST_NAME}|with:${failure}}" + rm -rf "${backup_dir:?}" + run_br backup full -s local://"$backup_dir" --ratelimit 128 --ratelimit-unit 1024 & + backup_pid=$! + single_point_fault $failure + wait $backup_pid + case $failure in + scale-out | shutdown | outage-at-finegrained ) stop_services + start_services ;; + *) ;; + esac + + + check +done diff --git a/tests/br_serv_outage/workload b/tests/br_serv_outage/workload new file mode 100644 index 000000000..de43df83b --- /dev/null +++ b/tests/br_serv_outage/workload @@ -0,0 +1,12 @@ +recordcount=20000 +operationcount=0 +workload=core + +readallfields=true + +readproportion=0 +updateproportion=0 +scanproportion=0 +insertproportion=0 + +requestdistribution=uniform