Skip to content

Commit

Permalink
lightning: support disable scheduler by key range (#34130)
Browse files Browse the repository at this point in the history
close #35148
  • Loading branch information
sleepymole authored Jun 24, 2022
1 parent 3f8df8c commit 3dd54b8
Show file tree
Hide file tree
Showing 17 changed files with 416 additions and 60 deletions.
14 changes: 14 additions & 0 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
package errors

import (
"context"
stderrors "errors"

"github.com/pingcap/errors"
)

Expand All @@ -15,6 +18,17 @@ func Is(err error, is *errors.Error) bool {
return errorFound != nil
}

// IsContextCanceled checks whether the is caused by context.Canceled.
// errors.Cause does not work for the error wrapped by %w in fmt.Errorf.
// So we need to call stderrors.Is to unwrap the error.
func IsContextCanceled(err error) bool {
err = errors.Cause(err)
if err == context.Canceled || err == context.DeadlineExceeded {
return true
}
return stderrors.Is(err, context.Canceled) || stderrors.Is(err, context.DeadlineExceeded)
}

// BR errors.
var (
ErrUnknown = errors.Normalize("internal error", errors.RFCCodeText("BR:Common:ErrUnknown"))
Expand Down
24 changes: 24 additions & 0 deletions br/pkg/errors/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.

package errors_test

import (
"context"
"net/url"
"testing"

"github.com/pingcap/errors"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/stretchr/testify/require"
)

func TestIsContextCanceled(t *testing.T) {
require.False(t, berrors.IsContextCanceled(nil))
require.False(t, berrors.IsContextCanceled(errors.New("connection closed")))
require.True(t, berrors.IsContextCanceled(context.Canceled))
require.True(t, berrors.IsContextCanceled(context.DeadlineExceeded))
require.True(t, berrors.IsContextCanceled(errors.Trace(context.Canceled)))
require.True(t, berrors.IsContextCanceled(errors.Trace(context.DeadlineExceeded)))
require.True(t, berrors.IsContextCanceled(&url.Error{Err: context.Canceled}))
require.True(t, berrors.IsContextCanceled(&url.Error{Err: context.DeadlineExceeded}))
}
24 changes: 24 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1392,8 +1392,32 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi
return err
}

if len(ranges) > 0 && local.pdCtl.CanPauseSchedulerByKeyRange() {
subCtx, cancel := context.WithCancel(ctx)
defer cancel()

var startKey, endKey []byte
if len(ranges[0].start) > 0 {
startKey = codec.EncodeBytes(nil, ranges[0].start)
}
if len(ranges[len(ranges)-1].end) > 0 {
endKey = codec.EncodeBytes(nil, ranges[len(ranges)-1].end)
}
done, err := local.pdCtl.PauseSchedulersByKeyRange(subCtx, startKey, endKey)
if err != nil {
return errors.Trace(err)
}
defer func() {
cancel()
<-done
}()
}

log.FromContext(ctx).Info("start import engine", zap.Stringer("uuid", engineUUID),
zap.Int("ranges", len(ranges)), zap.Int64("count", lfLength), zap.Int64("size", lfTotalSize))

failpoint.Inject("ReadyForImportEngine", func() {})

for {
unfinishedRanges := lf.unfinishedRanges(ranges)
if len(unfinishedRanges) == 0 {
Expand Down
59 changes: 29 additions & 30 deletions br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"context"
"database/sql"
"math"
"regexp"
"runtime"
"sort"
"strings"
Expand Down Expand Up @@ -332,14 +331,7 @@ func (local *local) SplitAndScatterRegionByRanges(
}

startTime := time.Now()
scatterCount := 0
for _, region := range scatterRegions {
local.waitForScatterRegion(ctx, region)
if time.Since(startTime) > split.ScatterWaitUpperInterval {
break
}
scatterCount++
}
scatterCount, err := local.waitForScatterRegions(ctx, scatterRegions)
if scatterCount == len(scatterRegions) {
log.FromContext(ctx).Info("waiting for scattering regions done",
zap.Int("skipped_keys", skippedKeys),
Expand All @@ -349,7 +341,8 @@ func (local *local) SplitAndScatterRegionByRanges(
zap.Int("skipped_keys", skippedKeys),
zap.Int("scatterCount", scatterCount),
zap.Int("regions", len(scatterRegions)),
zap.Duration("take", time.Since(startTime)))
zap.Duration("take", time.Since(startTime)),
zap.Error(err))
}
return nil
}
Expand Down Expand Up @@ -447,28 +440,38 @@ func (local *local) waitForSplit(ctx context.Context, regionID uint64) {
}
}

func (local *local) waitForScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) {
for i := 0; i < split.ScatterWaitMaxRetryTimes; i++ {
ok, err := local.checkScatterRegionFinishedOrReScatter(ctx, regionInfo)
if ok {
return
}
if err != nil {
if !common.IsRetryableError(err) {
log.FromContext(ctx).Warn("wait for scatter region encountered non-retryable error", logutil.Region(regionInfo.Region), zap.Error(err))
return
func (local *local) waitForScatterRegions(ctx context.Context, regions []*split.RegionInfo) (scatterCount int, _ error) {
subCtx, cancel := context.WithTimeout(ctx, split.ScatterWaitUpperInterval)
defer cancel()

for len(regions) > 0 {
var retryRegions []*split.RegionInfo
for _, region := range regions {
scattered, err := local.checkRegionScatteredOrReScatter(subCtx, region)
if scattered {
scatterCount++
continue
}
log.FromContext(ctx).Warn("wait for scatter region encountered error, will retry again", logutil.Region(regionInfo.Region), zap.Error(err))
if err != nil {
if !common.IsRetryableError(err) {
log.FromContext(ctx).Warn("wait for scatter region encountered non-retryable error", logutil.Region(region.Region), zap.Error(err))
return scatterCount, err
}
log.FromContext(ctx).Warn("wait for scatter region encountered error, will retry again", logutil.Region(region.Region), zap.Error(err))
}
retryRegions = append(retryRegions, region)
}
regions = retryRegions
select {
case <-time.After(time.Second):
case <-ctx.Done():
case <-subCtx.Done():
return
}
}
return scatterCount, nil
}

func (local *local) checkScatterRegionFinishedOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) {
func (local *local) checkRegionScatteredOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) {
resp, err := local.splitCli.GetOperator(ctx, regionInfo.Region.GetId())
if err != nil {
return false, err
Expand All @@ -478,13 +481,9 @@ func (local *local) checkScatterRegionFinishedOrReScatter(ctx context.Context, r
if respErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND {
return true, nil
}
// don't return error if region replicate not complete
// TODO: should add a new error type to avoid this check by string matching
matches, _ := regexp.MatchString("region \\d+ is not fully replicated", respErr.Message)
if matches {
return false, nil
}
return false, errors.Errorf("get operator error: %s", respErr.GetType())
return false, errors.Errorf(
"failed to get region operator, error type: %s, error message: %s",
respErr.GetType().String(), respErr.GetMessage())
}
// If the current operator of the region is not 'scatter-region', we could assume
// that 'scatter-operator' has finished.
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/lightning/common/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"io"
"net"
"os"
"regexp"
"syscall"

"github.com/go-sql-driver/mysql"
Expand All @@ -30,6 +31,8 @@ import (
"google.golang.org/grpc/status"
)

var regionNotFullyReplicatedRe = regexp.MustCompile(`region \d+ is not fully replicated`)

// IsRetryableError returns whether the error is transient (e.g. network
// connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This
// function returns `false` (irrecoverable) if `err == nil`.
Expand Down Expand Up @@ -88,6 +91,9 @@ func isSingleRetryableError(err error) bool {
}
return false
default:
if regionNotFullyReplicatedRe.MatchString(err.Error()) {
return true
}
switch status.Code(err) {
case codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss:
return true
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/common/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,6 @@ func TestIsRetryableError(t *testing.T) {
require.False(t, IsRetryableError(multierr.Combine(context.Canceled, context.Canceled)))
require.True(t, IsRetryableError(multierr.Combine(&net.DNSError{IsTimeout: true}, &net.DNSError{IsTimeout: true})))
require.False(t, IsRetryableError(multierr.Combine(context.Canceled, &net.DNSError{IsTimeout: true})))

require.True(t, IsRetryableError(errors.Errorf("region %d is not fully replicated", 1234)))
}
8 changes: 8 additions & 0 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,14 @@ func (l *Lightning) goServe(statusAddr string, realAddrWriter io.Writer) error {
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)

// Enable failpoint http API for testing.
failpoint.Inject("EnableTestAPI", func() {
mux.HandleFunc("/fail/", func(w http.ResponseWriter, r *http.Request) {
r.URL.Path = strings.TrimPrefix(r.URL.Path, "/fail")
new(failpoint.HttpHandler).ServeHTTP(w, r)
})
})

handleTasks := http.StripPrefix("/tasks", http.HandlerFunc(l.handleTask))
mux.Handle("/tasks", httpHandleWrapper(handleTasks.ServeHTTP))
mux.Handle("/tasks/", httpHandleWrapper(handleTasks.ServeHTTP))
Expand Down
14 changes: 14 additions & 0 deletions br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,8 @@ type taskMetaMgr interface {
// need to update or any new tasks. There is at most one lightning who can execute the action function at the same time.
// Note that action may be executed multiple times due to transaction retry, caller should make sure it's idempotent.
CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error
// CanPauseSchedulerByKeyRange returns whether the scheduler can pause by the key range.
CanPauseSchedulerByKeyRange() bool
CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error)
// CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata
// Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal)
Expand Down Expand Up @@ -867,6 +869,10 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U
}, nil
}

func (m *dbTaskMetaMgr) CanPauseSchedulerByKeyRange() bool {
return m.pd.CanPauseSchedulerByKeyRange()
}

// CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata
// Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal)
// the second boolean indicates whether to clean up the metadata in tidb
Expand Down Expand Up @@ -1058,6 +1064,10 @@ func (m noopTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.
}, nil
}

func (m noopTaskMetaMgr) CanPauseSchedulerByKeyRange() bool {
return false
}

func (m noopTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) {
return true, nil
}
Expand Down Expand Up @@ -1168,6 +1178,10 @@ func (m *singleTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdut
return m.pd.RemoveSchedulers(ctx)
}

func (m *singleTaskMetaMgr) CanPauseSchedulerByKeyRange() bool {
return m.pd.CanPauseSchedulerByKeyRange()
}

func (m *singleTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) {
return m.initialized, nil
}
Expand Down
53 changes: 31 additions & 22 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1370,42 +1370,51 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
// make split region and ingest sst more stable
// because importer backend is mostly use for v3.x cluster which doesn't support these api,
// so we also don't do this for import backend
finishSchedulers := func() {}
finishSchedulers := func() {
if rc.taskMgr != nil {
rc.taskMgr.Close()
}
}
// if one lightning failed abnormally, and can't determine whether it needs to switch back,
// we do not do switch back automatically
switchBack := false
cleanup := false
postProgress := func() error { return nil }
if rc.cfg.TikvImporter.Backend == config.BackendLocal {
var restoreFn pdutil.UndoFunc

logTask.Info("removing PD leader&region schedulers")
if !rc.taskMgr.CanPauseSchedulerByKeyRange() {
logTask.Info("removing PD leader&region schedulers")

restoreFn, err := rc.taskMgr.CheckAndPausePdSchedulers(ctx)
if err != nil {
return errors.Trace(err)
var err error
restoreFn, err = rc.taskMgr.CheckAndPausePdSchedulers(ctx)
if err != nil {
return errors.Trace(err)
}
}

finishSchedulers = func() {
if restoreFn != nil {
taskFinished := finalErr == nil
// use context.Background to make sure this restore function can still be executed even if ctx is canceled
restoreCtx := context.Background()
needSwitchBack, needCleanup, err := rc.taskMgr.CheckAndFinishRestore(restoreCtx, taskFinished)
if err != nil {
logTask.Warn("check restore pd schedulers failed", zap.Error(err))
return
}
switchBack = needSwitchBack
if needSwitchBack {
logTask.Info("add back PD leader&region schedulers")
if restoreE := restoreFn(restoreCtx); restoreE != nil {
logTask.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE))
}
taskFinished := finalErr == nil
// use context.Background to make sure this restore function can still be executed even if ctx is canceled
restoreCtx := context.Background()
needSwitchBack, needCleanup, err := rc.taskMgr.CheckAndFinishRestore(restoreCtx, taskFinished)
if err != nil {
logTask.Warn("check restore pd schedulers failed", zap.Error(err))
return
}
switchBack = needSwitchBack
cleanup = needCleanup

if needSwitchBack && restoreFn != nil {
logTask.Info("add back PD leader&region schedulers")
if restoreE := restoreFn(restoreCtx); restoreE != nil {
logTask.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE))
}
cleanup = needCleanup
}

rc.taskMgr.Close()
if rc.taskMgr != nil {
rc.taskMgr.Close()
}
}
}

Expand Down
Loading

0 comments on commit 3dd54b8

Please sign in to comment.