Skip to content

Commit

Permalink
lightning: support disable scheduler by key range
Browse files Browse the repository at this point in the history
  • Loading branch information
sleepymole committed Jun 6, 2022
1 parent 173dd00 commit 080cba1
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 35 deletions.
21 changes: 21 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1357,6 +1357,27 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi
return err
}

if len(ranges) > 0 && local.pdCtl.CanPauseSchedulerByKeyRange() {
subCtx, cancel := context.WithCancel(ctx)
defer cancel()

var startKey, endKey []byte
if len(ranges[0].start) > 0 {
startKey = codec.EncodeBytes(nil, ranges[0].start)
}
if len(ranges[len(ranges)-1].end) > 0 {
endKey = codec.EncodeBytes(nil, ranges[len(ranges)-1].end)
}
done, err := local.pdCtl.PauseSchedulersByKeyRange(subCtx, startKey, endKey)
if err != nil {
return errors.Trace(err)
}
defer func() {
cancel()
<-done
}()
}

log.L().Info("start import engine", zap.Stringer("uuid", engineUUID),
zap.Int("ranges", len(ranges)), zap.Int64("count", lfLength), zap.Int64("size", lfTotalSize))
for {
Expand Down
57 changes: 27 additions & 30 deletions br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"bytes"
"context"
"database/sql"
"regexp"
"runtime"
"sort"
"strings"
Expand Down Expand Up @@ -330,24 +329,18 @@ func (local *local) SplitAndScatterRegionByRanges(
}

startTime := time.Now()
scatterCount := 0
for _, region := range scatterRegions {
local.waitForScatterRegion(ctx, region)
if time.Since(startTime) > split.ScatterWaitUpperInterval {
break
}
scatterCount++
}
scatterCount, err := local.waitForScatterRegions(ctx, scatterRegions)
if scatterCount == len(scatterRegions) {
log.L().Info("waiting for scattering regions done",
zap.Int("skipped_keys", skippedKeys),
zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime)))
} else {
log.L().Info("waiting for scattering regions timeout",
log.L().Info("waiting for scattering regions partially finished",
zap.Int("skipped_keys", skippedKeys),
zap.Int("scatterCount", scatterCount),
zap.Int("regions", len(scatterRegions)),
zap.Duration("take", time.Since(startTime)))
zap.Duration("take", time.Since(startTime)),
zap.Error(err))
}
return nil
}
Expand Down Expand Up @@ -445,28 +438,38 @@ func (local *local) waitForSplit(ctx context.Context, regionID uint64) {
}
}

func (local *local) waitForScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) {
for i := 0; i < split.ScatterWaitMaxRetryTimes; i++ {
ok, err := local.checkScatterRegionFinishedOrReScatter(ctx, regionInfo)
if ok {
return
}
if err != nil {
if !common.IsRetryableError(err) {
log.L().Warn("wait for scatter region encountered non-retryable error", logutil.Region(regionInfo.Region), zap.Error(err))
return
func (local *local) waitForScatterRegions(ctx context.Context, regions []*split.RegionInfo) (scatterCount int, _ error) {
subCtx, cancel := context.WithTimeout(ctx, split.ScatterWaitUpperInterval)
defer cancel()

for len(regions) > 0 {
var retryRegions []*split.RegionInfo
for _, region := range regions {
scattered, err := local.checkRegionScatteredOrReScatter(subCtx, region)
if scattered {
scatterCount++
continue
}
log.L().Warn("wait for scatter region encountered error, will retry again", logutil.Region(regionInfo.Region), zap.Error(err))
if err != nil {
if !common.IsRetryableError(err) {
log.L().Warn("wait for scatter region encountered non-retryable error", logutil.Region(region.Region), zap.Error(err))
return scatterCount, err
}
log.L().Warn("wait for scatter region encountered error, will retry again", logutil.Region(region.Region), zap.Error(err))
}
retryRegions = append(retryRegions, region)
}
regions = retryRegions
select {
case <-time.After(time.Second):
case <-ctx.Done():
case <-subCtx.Done():
return
}
}
return scatterCount, nil
}

func (local *local) checkScatterRegionFinishedOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) {
func (local *local) checkRegionScatteredOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) {
resp, err := local.splitCli.GetOperator(ctx, regionInfo.Region.GetId())
if err != nil {
return false, err
Expand All @@ -476,12 +479,6 @@ func (local *local) checkScatterRegionFinishedOrReScatter(ctx context.Context, r
if respErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND {
return true, nil
}
// don't return error if region replicate not complete
// TODO: should add a new error type to avoid this check by string matching
matches, _ := regexp.MatchString("region \\d+ is not fully replicated", respErr.Message)
if matches {
return false, nil
}
return false, errors.Errorf("get operator error: %s", respErr.GetType())
}
// If the current operator of the region is not 'scatter-region', we could assume
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/lightning/common/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"io"
"net"
"os"
"regexp"
"syscall"

"github.com/go-sql-driver/mysql"
Expand All @@ -30,6 +31,8 @@ import (
"google.golang.org/grpc/status"
)

var regionNotFullyReplicatedRe = regexp.MustCompile(`region \d+ is not fully replicated`)

// IsRetryableError returns whether the error is transient (e.g. network
// connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This
// function returns `false` (irrecoverable) if `err == nil`.
Expand Down Expand Up @@ -88,6 +91,9 @@ func isSingleRetryableError(err error) bool {
}
return false
default:
if regionNotFullyReplicatedRe.MatchString(err.Error()) {
return true
}
switch status.Code(err) {
case codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss:
return true
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/common/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,6 @@ func TestIsRetryableError(t *testing.T) {
require.False(t, IsRetryableError(multierr.Combine(context.Canceled, context.Canceled)))
require.True(t, IsRetryableError(multierr.Combine(&net.DNSError{IsTimeout: true}, &net.DNSError{IsTimeout: true})))
require.False(t, IsRetryableError(multierr.Combine(context.Canceled, &net.DNSError{IsTimeout: true})))

require.True(t, IsRetryableError(errors.Errorf("region %d is not fully replicated", 1234)))
}
1 change: 1 addition & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ type TikvImporter struct {

EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"`
LocalWriterMemCacheSize ByteSize `toml:"local-writer-mem-cache-size" json:"local-writer-mem-cache-size"`
MaxKVWriteBytesPerSec ByteSize `toml:"max-kv-write-bytes-per-sec" json:"max-kv-write-bytes-per-sec"`
}

type Checkpoint struct {
Expand Down
14 changes: 14 additions & 0 deletions br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,8 @@ type taskMetaMgr interface {
// need to update or any new tasks. There is at most one lightning who can execute the action function at the same time.
// Note that action may be executed multiple times due to transaction retry, caller should make sure it's idempotent.
CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error
// CanPauseSchedulerByKeyRange returns whether the scheduler can pause by the key range.
CanPauseSchedulerByKeyRange() bool
CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error)
// CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata
// Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal)
Expand Down Expand Up @@ -817,6 +819,10 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U
}, nil
}

func (m *dbTaskMetaMgr) CanPauseSchedulerByKeyRange() bool {
return m.pd.CanPauseSchedulerByKeyRange()
}

// CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata
// Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal)
// the second boolean indicates whether to clean up the metadata in tidb
Expand Down Expand Up @@ -1002,6 +1008,10 @@ func (m noopTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.
}, nil
}

func (m noopTaskMetaMgr) CanPauseSchedulerByKeyRange() bool {
return false
}

func (m noopTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) {
return true, nil
}
Expand Down Expand Up @@ -1106,6 +1116,10 @@ func (m *singleTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdut
return m.pd.RemoveSchedulers(ctx)
}

func (m *singleTaskMetaMgr) CanPauseSchedulerByKeyRange() bool {
return m.pd.CanPauseSchedulerByKeyRange()
}

func (m *singleTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) {
return m.initialized, nil
}
Expand Down
9 changes: 6 additions & 3 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1355,14 +1355,17 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
// make split region and ingest sst more stable
// because importer backend is mostly use for v3.x cluster which doesn't support these api,
// so we also don't do this for import backend
finishSchedulers := func() {}
finishSchedulers := func() {
if rc.taskMgr != nil {
rc.taskMgr.Close()
}
}
// if one lightning failed abnormally, and can't determine whether it needs to switch back,
// we do not do switch back automatically
switchBack := false
cleanup := false
postProgress := func() error { return nil }
if rc.cfg.TikvImporter.Backend == config.BackendLocal {

if rc.cfg.TikvImporter.Backend == config.BackendLocal && !rc.taskMgr.CanPauseSchedulerByKeyRange() {
logTask.Info("removing PD leader&region schedulers")

restoreFn, err := rc.taskMgr.CheckAndPausePdSchedulers(ctx)
Expand Down
Loading

0 comments on commit 080cba1

Please sign in to comment.