Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: support disable scheduler by key range #34130

Merged
merged 22 commits into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
package errors

import (
"context"
stderrors "errors"

"github.com/pingcap/errors"
)

Expand All @@ -15,6 +18,17 @@ func Is(err error, is *errors.Error) bool {
return errorFound != nil
}

// IsContextCanceled checks whether the is caused by context.Canceled.
// errors.Cause does not work for the error wrapped by %w in fmt.Errorf.
// So we need to call stderrors.Is to unwrap the error.
func IsContextCanceled(err error) bool {
err = errors.Cause(err)
if err == context.Canceled || err == context.DeadlineExceeded {
return true
}
return stderrors.Is(err, context.Canceled) || stderrors.Is(err, context.DeadlineExceeded)
}

// BR errors.
var (
ErrUnknown = errors.Normalize("internal error", errors.RFCCodeText("BR:Common:ErrUnknown"))
Expand Down
24 changes: 24 additions & 0 deletions br/pkg/errors/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.

package errors_test

import (
"context"
"net/url"
"testing"

"github.com/pingcap/errors"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/stretchr/testify/require"
)

func TestIsContextCanceled(t *testing.T) {
require.False(t, berrors.IsContextCanceled(nil))
require.False(t, berrors.IsContextCanceled(errors.New("connection closed")))
require.True(t, berrors.IsContextCanceled(context.Canceled))
require.True(t, berrors.IsContextCanceled(context.DeadlineExceeded))
require.True(t, berrors.IsContextCanceled(errors.Trace(context.Canceled)))
require.True(t, berrors.IsContextCanceled(errors.Trace(context.DeadlineExceeded)))
require.True(t, berrors.IsContextCanceled(&url.Error{Err: context.Canceled}))
require.True(t, berrors.IsContextCanceled(&url.Error{Err: context.DeadlineExceeded}))
}
24 changes: 24 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1392,8 +1392,32 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi
return err
}

if len(ranges) > 0 && local.pdCtl.CanPauseSchedulerByKeyRange() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if several engines with range overlapping start importing at the same time, is the new scheduler compatible with this kind of concurrent operations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's compatible. Different engines will use different label rule id. It guarantees the scheduler is disabled on every engine key range even if overlapping exists.

subCtx, cancel := context.WithCancel(ctx)
defer cancel()

var startKey, endKey []byte
if len(ranges[0].start) > 0 {
startKey = codec.EncodeBytes(nil, ranges[0].start)
}
if len(ranges[len(ranges)-1].end) > 0 {
endKey = codec.EncodeBytes(nil, ranges[len(ranges)-1].end)
}
done, err := local.pdCtl.PauseSchedulersByKeyRange(subCtx, startKey, endKey)
if err != nil {
return errors.Trace(err)
}
defer func() {
cancel()
<-done
}()
}

log.FromContext(ctx).Info("start import engine", zap.Stringer("uuid", engineUUID),
zap.Int("ranges", len(ranges)), zap.Int64("count", lfLength), zap.Int64("size", lfTotalSize))

failpoint.Inject("ReadyForImportEngine", func() {})

for {
unfinishedRanges := lf.unfinishedRanges(ranges)
if len(unfinishedRanges) == 0 {
Expand Down
59 changes: 29 additions & 30 deletions br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"context"
"database/sql"
"math"
"regexp"
"runtime"
"sort"
"strings"
Expand Down Expand Up @@ -332,14 +331,7 @@ func (local *local) SplitAndScatterRegionByRanges(
}

startTime := time.Now()
scatterCount := 0
for _, region := range scatterRegions {
local.waitForScatterRegion(ctx, region)
if time.Since(startTime) > split.ScatterWaitUpperInterval {
break
}
scatterCount++
}
scatterCount, err := local.waitForScatterRegions(ctx, scatterRegions)
if scatterCount == len(scatterRegions) {
log.FromContext(ctx).Info("waiting for scattering regions done",
zap.Int("skipped_keys", skippedKeys),
Expand All @@ -349,7 +341,8 @@ func (local *local) SplitAndScatterRegionByRanges(
zap.Int("skipped_keys", skippedKeys),
zap.Int("scatterCount", scatterCount),
zap.Int("regions", len(scatterRegions)),
zap.Duration("take", time.Since(startTime)))
zap.Duration("take", time.Since(startTime)),
zap.Error(err))
}
return nil
Comment on lines +345 to 347
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to handle this error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emm, it's hard to say. Until now, we didn't treat scatter region error as a critical error.

}
Expand Down Expand Up @@ -447,28 +440,38 @@ func (local *local) waitForSplit(ctx context.Context, regionID uint64) {
}
}

func (local *local) waitForScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) {
for i := 0; i < split.ScatterWaitMaxRetryTimes; i++ {
ok, err := local.checkScatterRegionFinishedOrReScatter(ctx, regionInfo)
if ok {
return
}
if err != nil {
if !common.IsRetryableError(err) {
log.FromContext(ctx).Warn("wait for scatter region encountered non-retryable error", logutil.Region(regionInfo.Region), zap.Error(err))
return
func (local *local) waitForScatterRegions(ctx context.Context, regions []*split.RegionInfo) (scatterCount int, _ error) {
subCtx, cancel := context.WithTimeout(ctx, split.ScatterWaitUpperInterval)
defer cancel()

for len(regions) > 0 {
var retryRegions []*split.RegionInfo
for _, region := range regions {
scattered, err := local.checkRegionScatteredOrReScatter(subCtx, region)
if scattered {
scatterCount++
continue
}
log.FromContext(ctx).Warn("wait for scatter region encountered error, will retry again", logutil.Region(regionInfo.Region), zap.Error(err))
if err != nil {
if !common.IsRetryableError(err) {
log.FromContext(ctx).Warn("wait for scatter region encountered non-retryable error", logutil.Region(region.Region), zap.Error(err))
return scatterCount, err
}
log.FromContext(ctx).Warn("wait for scatter region encountered error, will retry again", logutil.Region(region.Region), zap.Error(err))
}
retryRegions = append(retryRegions, region)
}
regions = retryRegions
select {
case <-time.After(time.Second):
case <-ctx.Done():
case <-subCtx.Done():
return
}
}
return scatterCount, nil
}

func (local *local) checkScatterRegionFinishedOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) {
func (local *local) checkRegionScatteredOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) {
resp, err := local.splitCli.GetOperator(ctx, regionInfo.Region.GetId())
if err != nil {
return false, err
Expand All @@ -478,13 +481,9 @@ func (local *local) checkScatterRegionFinishedOrReScatter(ctx context.Context, r
if respErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND {
return true, nil
}
// don't return error if region replicate not complete
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to handle this error now, do we?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is moved to retriable errors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, it's caught by the caller.

// TODO: should add a new error type to avoid this check by string matching
matches, _ := regexp.MatchString("region \\d+ is not fully replicated", respErr.Message)
if matches {
return false, nil
}
return false, errors.Errorf("get operator error: %s", respErr.GetType())
return false, errors.Errorf(
"failed to get region operator, error type: %s, error message: %s",
respErr.GetType().String(), respErr.GetMessage())
}
// If the current operator of the region is not 'scatter-region', we could assume
// that 'scatter-operator' has finished.
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/lightning/common/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"io"
"net"
"os"
"regexp"
"syscall"

"github.com/go-sql-driver/mysql"
Expand All @@ -30,6 +31,8 @@ import (
"google.golang.org/grpc/status"
)

var regionNotFullyReplicatedRe = regexp.MustCompile(`region \d+ is not fully replicated`)

// IsRetryableError returns whether the error is transient (e.g. network
// connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This
// function returns `false` (irrecoverable) if `err == nil`.
Expand Down Expand Up @@ -88,6 +91,9 @@ func isSingleRetryableError(err error) bool {
}
return false
default:
if regionNotFullyReplicatedRe.MatchString(err.Error()) {
return true
}
switch status.Code(err) {
case codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss:
return true
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/common/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,6 @@ func TestIsRetryableError(t *testing.T) {
require.False(t, IsRetryableError(multierr.Combine(context.Canceled, context.Canceled)))
require.True(t, IsRetryableError(multierr.Combine(&net.DNSError{IsTimeout: true}, &net.DNSError{IsTimeout: true})))
require.False(t, IsRetryableError(multierr.Combine(context.Canceled, &net.DNSError{IsTimeout: true})))

require.True(t, IsRetryableError(errors.Errorf("region %d is not fully replicated", 1234)))
}
8 changes: 8 additions & 0 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,14 @@ func (l *Lightning) goServe(statusAddr string, realAddrWriter io.Writer) error {
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)

// Enable failpoint http API for testing.
failpoint.Inject("EnableTestAPI", func() {
mux.HandleFunc("/fail/", func(w http.ResponseWriter, r *http.Request) {
r.URL.Path = strings.TrimPrefix(r.URL.Path, "/fail")
new(failpoint.HttpHandler).ServeHTTP(w, r)
})
})

handleTasks := http.StripPrefix("/tasks", http.HandlerFunc(l.handleTask))
mux.Handle("/tasks", httpHandleWrapper(handleTasks.ServeHTTP))
mux.Handle("/tasks/", httpHandleWrapper(handleTasks.ServeHTTP))
Expand Down
14 changes: 14 additions & 0 deletions br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,8 @@ type taskMetaMgr interface {
// need to update or any new tasks. There is at most one lightning who can execute the action function at the same time.
// Note that action may be executed multiple times due to transaction retry, caller should make sure it's idempotent.
CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error
// CanPauseSchedulerByKeyRange returns whether the scheduler can pause by the key range.
CanPauseSchedulerByKeyRange() bool
CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error)
// CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata
// Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal)
Expand Down Expand Up @@ -867,6 +869,10 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U
}, nil
}

func (m *dbTaskMetaMgr) CanPauseSchedulerByKeyRange() bool {
return m.pd.CanPauseSchedulerByKeyRange()
}

// CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata
// Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal)
// the second boolean indicates whether to clean up the metadata in tidb
Expand Down Expand Up @@ -1058,6 +1064,10 @@ func (m noopTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.
}, nil
}

func (m noopTaskMetaMgr) CanPauseSchedulerByKeyRange() bool {
return false
}

func (m noopTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) {
return true, nil
}
Expand Down Expand Up @@ -1168,6 +1178,10 @@ func (m *singleTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdut
return m.pd.RemoveSchedulers(ctx)
}

func (m *singleTaskMetaMgr) CanPauseSchedulerByKeyRange() bool {
return m.pd.CanPauseSchedulerByKeyRange()
}

func (m *singleTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) {
return m.initialized, nil
}
Expand Down
53 changes: 31 additions & 22 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1370,42 +1370,51 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
// make split region and ingest sst more stable
// because importer backend is mostly use for v3.x cluster which doesn't support these api,
// so we also don't do this for import backend
finishSchedulers := func() {}
finishSchedulers := func() {
if rc.taskMgr != nil {
rc.taskMgr.Close()
}
}
// if one lightning failed abnormally, and can't determine whether it needs to switch back,
// we do not do switch back automatically
switchBack := false
cleanup := false
postProgress := func() error { return nil }
if rc.cfg.TikvImporter.Backend == config.BackendLocal {
var restoreFn pdutil.UndoFunc

logTask.Info("removing PD leader&region schedulers")
if !rc.taskMgr.CanPauseSchedulerByKeyRange() {
logTask.Info("removing PD leader&region schedulers")

restoreFn, err := rc.taskMgr.CheckAndPausePdSchedulers(ctx)
if err != nil {
return errors.Trace(err)
var err error
restoreFn, err = rc.taskMgr.CheckAndPausePdSchedulers(ctx)
if err != nil {
return errors.Trace(err)
}
}

finishSchedulers = func() {
if restoreFn != nil {
taskFinished := finalErr == nil
// use context.Background to make sure this restore function can still be executed even if ctx is canceled
restoreCtx := context.Background()
needSwitchBack, needCleanup, err := rc.taskMgr.CheckAndFinishRestore(restoreCtx, taskFinished)
if err != nil {
logTask.Warn("check restore pd schedulers failed", zap.Error(err))
return
}
switchBack = needSwitchBack
if needSwitchBack {
logTask.Info("add back PD leader&region schedulers")
if restoreE := restoreFn(restoreCtx); restoreE != nil {
logTask.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE))
}
taskFinished := finalErr == nil
// use context.Background to make sure this restore function can still be executed even if ctx is canceled
restoreCtx := context.Background()
needSwitchBack, needCleanup, err := rc.taskMgr.CheckAndFinishRestore(restoreCtx, taskFinished)
if err != nil {
logTask.Warn("check restore pd schedulers failed", zap.Error(err))
return
}
switchBack = needSwitchBack
cleanup = needCleanup

if needSwitchBack && restoreFn != nil {
logTask.Info("add back PD leader&region schedulers")
if restoreE := restoreFn(restoreCtx); restoreE != nil {
logTask.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE))
}
cleanup = needCleanup
}

rc.taskMgr.Close()
if rc.taskMgr != nil {
rc.taskMgr.Close()
}
}
}

Expand Down
Loading