Skip to content

Commit

Permalink
Merge branch 'master' into sessionstop
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao committed Jun 24, 2022
2 parents 4dd804e + 3dd54b8 commit 09d4bbd
Show file tree
Hide file tree
Showing 115 changed files with 2,016 additions and 506 deletions.
24 changes: 24 additions & 0 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ def go_deps():
sum = "h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=",
version = "v2.1.2",
)
go_repository(
name = "com_github_charithe_durationcheck",
build_file_proto_mode = "disable",
importpath = "github.com/charithe/durationcheck",
sum = "h1:mPP4ucLrf/rKZiIG/a9IPXHGlh8p4CzgpyTy6EEutYk=",
version = "v0.0.9",
)

go_repository(
name = "com_github_cheggaaa_pb_v3",
build_file_proto_mode = "disable_global",
Expand Down Expand Up @@ -1011,6 +1019,14 @@ def go_deps():
sum = "h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=",
version = "v0.0.0-20181017120253-0766667cb4d1",
)
go_repository(
name = "com_github_gordonklaus_ineffassign",
build_file_proto_mode = "disable",
importpath = "github.com/gordonklaus/ineffassign",
sum = "h1:PVRE9d4AQKmbelZ7emNig1+NT27DUmKZn5qXxfio54U=",
version = "v0.0.0-20210914165742-4cc7213b9bc8",
)

go_repository(
name = "com_github_gorilla_handlers",
build_file_proto_mode = "disable_global",
Expand Down Expand Up @@ -1541,6 +1557,14 @@ def go_deps():
sum = "h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=",
version = "v0.2.0",
)
go_repository(
name = "com_github_kyoh86_exportloopref",
build_file_proto_mode = "disable",
importpath = "github.com/kyoh86/exportloopref",
sum = "h1:5Ry/at+eFdkX9Vsdw3qU4YkvGtzuVfzT4X7S77LoN/M=",
version = "v0.1.8",
)

go_repository(
name = "com_github_labstack_echo_v4",
build_file_proto_mode = "disable_global",
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,9 @@ func (bc *Client) BackupRanges(
progressCallBack func(ProgressUnit),
) error {
init := time.Now()
defer log.Info("Backup Ranges", zap.Duration("take", time.Since(init)))
defer func() {
log.Info("Backup Ranges", zap.Duration("take", time.Since(init)))
}()

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("Client.BackupRanges", opentracing.ChildOf(span.Context()))
Expand Down
14 changes: 14 additions & 0 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
package errors

import (
"context"
stderrors "errors"

"github.com/pingcap/errors"
)

Expand All @@ -15,6 +18,17 @@ func Is(err error, is *errors.Error) bool {
return errorFound != nil
}

// IsContextCanceled checks whether the is caused by context.Canceled.
// errors.Cause does not work for the error wrapped by %w in fmt.Errorf.
// So we need to call stderrors.Is to unwrap the error.
func IsContextCanceled(err error) bool {
err = errors.Cause(err)
if err == context.Canceled || err == context.DeadlineExceeded {
return true
}
return stderrors.Is(err, context.Canceled) || stderrors.Is(err, context.DeadlineExceeded)
}

// BR errors.
var (
ErrUnknown = errors.Normalize("internal error", errors.RFCCodeText("BR:Common:ErrUnknown"))
Expand Down
24 changes: 24 additions & 0 deletions br/pkg/errors/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.

package errors_test

import (
"context"
"net/url"
"testing"

"github.com/pingcap/errors"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/stretchr/testify/require"
)

func TestIsContextCanceled(t *testing.T) {
require.False(t, berrors.IsContextCanceled(nil))
require.False(t, berrors.IsContextCanceled(errors.New("connection closed")))
require.True(t, berrors.IsContextCanceled(context.Canceled))
require.True(t, berrors.IsContextCanceled(context.DeadlineExceeded))
require.True(t, berrors.IsContextCanceled(errors.Trace(context.Canceled)))
require.True(t, berrors.IsContextCanceled(errors.Trace(context.DeadlineExceeded)))
require.True(t, berrors.IsContextCanceled(&url.Error{Err: context.Canceled}))
require.True(t, berrors.IsContextCanceled(&url.Error{Err: context.DeadlineExceeded}))
}
1 change: 1 addition & 0 deletions br/pkg/lightning/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ go_test(
"//br/pkg/lightning/checkpoints",
"//br/pkg/lightning/config",
"//br/pkg/lightning/glue",
"//br/pkg/lightning/log",
"//br/pkg/lightning/mydump",
"//br/pkg/lightning/web",
"@com_github_docker_go_units//:go-units",
Expand Down
24 changes: 24 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1392,8 +1392,32 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi
return err
}

if len(ranges) > 0 && local.pdCtl.CanPauseSchedulerByKeyRange() {
subCtx, cancel := context.WithCancel(ctx)
defer cancel()

var startKey, endKey []byte
if len(ranges[0].start) > 0 {
startKey = codec.EncodeBytes(nil, ranges[0].start)
}
if len(ranges[len(ranges)-1].end) > 0 {
endKey = codec.EncodeBytes(nil, ranges[len(ranges)-1].end)
}
done, err := local.pdCtl.PauseSchedulersByKeyRange(subCtx, startKey, endKey)
if err != nil {
return errors.Trace(err)
}
defer func() {
cancel()
<-done
}()
}

log.FromContext(ctx).Info("start import engine", zap.Stringer("uuid", engineUUID),
zap.Int("ranges", len(ranges)), zap.Int64("count", lfLength), zap.Int64("size", lfTotalSize))

failpoint.Inject("ReadyForImportEngine", func() {})

for {
unfinishedRanges := lf.unfinishedRanges(ranges)
if len(unfinishedRanges) == 0 {
Expand Down
59 changes: 29 additions & 30 deletions br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"context"
"database/sql"
"math"
"regexp"
"runtime"
"sort"
"strings"
Expand Down Expand Up @@ -332,14 +331,7 @@ func (local *local) SplitAndScatterRegionByRanges(
}

startTime := time.Now()
scatterCount := 0
for _, region := range scatterRegions {
local.waitForScatterRegion(ctx, region)
if time.Since(startTime) > split.ScatterWaitUpperInterval {
break
}
scatterCount++
}
scatterCount, err := local.waitForScatterRegions(ctx, scatterRegions)
if scatterCount == len(scatterRegions) {
log.FromContext(ctx).Info("waiting for scattering regions done",
zap.Int("skipped_keys", skippedKeys),
Expand All @@ -349,7 +341,8 @@ func (local *local) SplitAndScatterRegionByRanges(
zap.Int("skipped_keys", skippedKeys),
zap.Int("scatterCount", scatterCount),
zap.Int("regions", len(scatterRegions)),
zap.Duration("take", time.Since(startTime)))
zap.Duration("take", time.Since(startTime)),
zap.Error(err))
}
return nil
}
Expand Down Expand Up @@ -447,28 +440,38 @@ func (local *local) waitForSplit(ctx context.Context, regionID uint64) {
}
}

func (local *local) waitForScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) {
for i := 0; i < split.ScatterWaitMaxRetryTimes; i++ {
ok, err := local.checkScatterRegionFinishedOrReScatter(ctx, regionInfo)
if ok {
return
}
if err != nil {
if !common.IsRetryableError(err) {
log.FromContext(ctx).Warn("wait for scatter region encountered non-retryable error", logutil.Region(regionInfo.Region), zap.Error(err))
return
func (local *local) waitForScatterRegions(ctx context.Context, regions []*split.RegionInfo) (scatterCount int, _ error) {
subCtx, cancel := context.WithTimeout(ctx, split.ScatterWaitUpperInterval)
defer cancel()

for len(regions) > 0 {
var retryRegions []*split.RegionInfo
for _, region := range regions {
scattered, err := local.checkRegionScatteredOrReScatter(subCtx, region)
if scattered {
scatterCount++
continue
}
log.FromContext(ctx).Warn("wait for scatter region encountered error, will retry again", logutil.Region(regionInfo.Region), zap.Error(err))
if err != nil {
if !common.IsRetryableError(err) {
log.FromContext(ctx).Warn("wait for scatter region encountered non-retryable error", logutil.Region(region.Region), zap.Error(err))
return scatterCount, err
}
log.FromContext(ctx).Warn("wait for scatter region encountered error, will retry again", logutil.Region(region.Region), zap.Error(err))
}
retryRegions = append(retryRegions, region)
}
regions = retryRegions
select {
case <-time.After(time.Second):
case <-ctx.Done():
case <-subCtx.Done():
return
}
}
return scatterCount, nil
}

func (local *local) checkScatterRegionFinishedOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) {
func (local *local) checkRegionScatteredOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) {
resp, err := local.splitCli.GetOperator(ctx, regionInfo.Region.GetId())
if err != nil {
return false, err
Expand All @@ -478,13 +481,9 @@ func (local *local) checkScatterRegionFinishedOrReScatter(ctx context.Context, r
if respErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND {
return true, nil
}
// don't return error if region replicate not complete
// TODO: should add a new error type to avoid this check by string matching
matches, _ := regexp.MatchString("region \\d+ is not fully replicated", respErr.Message)
if matches {
return false, nil
}
return false, errors.Errorf("get operator error: %s", respErr.GetType())
return false, errors.Errorf(
"failed to get region operator, error type: %s, error message: %s",
respErr.GetType().String(), respErr.GetMessage())
}
// If the current operator of the region is not 'scatter-region', we could assume
// that 'scatter-operator' has finished.
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/lightning/common/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"io"
"net"
"os"
"regexp"
"syscall"

"github.com/go-sql-driver/mysql"
Expand All @@ -30,6 +31,8 @@ import (
"google.golang.org/grpc/status"
)

var regionNotFullyReplicatedRe = regexp.MustCompile(`region \d+ is not fully replicated`)

// IsRetryableError returns whether the error is transient (e.g. network
// connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This
// function returns `false` (irrecoverable) if `err == nil`.
Expand Down Expand Up @@ -88,6 +91,9 @@ func isSingleRetryableError(err error) bool {
}
return false
default:
if regionNotFullyReplicatedRe.MatchString(err.Error()) {
return true
}
switch status.Code(err) {
case codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss:
return true
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/common/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,6 @@ func TestIsRetryableError(t *testing.T) {
require.False(t, IsRetryableError(multierr.Combine(context.Canceled, context.Canceled)))
require.True(t, IsRetryableError(multierr.Combine(&net.DNSError{IsTimeout: true}, &net.DNSError{IsTimeout: true})))
require.False(t, IsRetryableError(multierr.Combine(context.Canceled, &net.DNSError{IsTimeout: true})))

require.True(t, IsRetryableError(errors.Errorf("region %d is not fully replicated", 1234)))
}
1 change: 1 addition & 0 deletions br/pkg/lightning/errormanager/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_test(
embed = [":errormanager"],
deps = [
"//br/pkg/lightning/config",
"//br/pkg/lightning/log",
"//br/pkg/utils",
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
"@com_github_stretchr_testify//require",
Expand Down
8 changes: 8 additions & 0 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,14 @@ func (l *Lightning) goServe(statusAddr string, realAddrWriter io.Writer) error {
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)

// Enable failpoint http API for testing.
failpoint.Inject("EnableTestAPI", func() {
mux.HandleFunc("/fail/", func(w http.ResponseWriter, r *http.Request) {
r.URL.Path = strings.TrimPrefix(r.URL.Path, "/fail")
new(failpoint.HttpHandler).ServeHTTP(w, r)
})
})

handleTasks := http.StripPrefix("/tasks", http.HandlerFunc(l.handleTask))
mux.Handle("/tasks", httpHandleWrapper(handleTasks.ServeHTTP))
mux.Handle("/tasks/", httpHandleWrapper(handleTasks.ServeHTTP))
Expand Down
14 changes: 14 additions & 0 deletions br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,8 @@ type taskMetaMgr interface {
// need to update or any new tasks. There is at most one lightning who can execute the action function at the same time.
// Note that action may be executed multiple times due to transaction retry, caller should make sure it's idempotent.
CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error
// CanPauseSchedulerByKeyRange returns whether the scheduler can pause by the key range.
CanPauseSchedulerByKeyRange() bool
CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error)
// CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata
// Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal)
Expand Down Expand Up @@ -867,6 +869,10 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U
}, nil
}

func (m *dbTaskMetaMgr) CanPauseSchedulerByKeyRange() bool {
return m.pd.CanPauseSchedulerByKeyRange()
}

// CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata
// Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal)
// the second boolean indicates whether to clean up the metadata in tidb
Expand Down Expand Up @@ -1058,6 +1064,10 @@ func (m noopTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.
}, nil
}

func (m noopTaskMetaMgr) CanPauseSchedulerByKeyRange() bool {
return false
}

func (m noopTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) {
return true, nil
}
Expand Down Expand Up @@ -1168,6 +1178,10 @@ func (m *singleTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdut
return m.pd.RemoveSchedulers(ctx)
}

func (m *singleTaskMetaMgr) CanPauseSchedulerByKeyRange() bool {
return m.pd.CanPauseSchedulerByKeyRange()
}

func (m *singleTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) {
return m.initialized, nil
}
Expand Down
Loading

0 comments on commit 09d4bbd

Please sign in to comment.