Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: support disable scheduler by key range #34130

Merged
merged 22 commits into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1357,6 +1357,27 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi
return err
}

if len(ranges) > 0 && local.pdCtl.CanPauseSchedulerByKeyRange() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if several engines with range overlapping start importing at the same time, is the new scheduler compatible with this kind of concurrent operations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's compatible. Different engines will use different label rule id. It guarantees the scheduler is disabled on every engine key range even if overlapping exists.

subCtx, cancel := context.WithCancel(ctx)
defer cancel()

var startKey, endKey []byte
if len(ranges[0].start) > 0 {
startKey = codec.EncodeBytes(nil, ranges[0].start)
}
if len(ranges[len(ranges)-1].end) > 0 {
endKey = codec.EncodeBytes(nil, ranges[len(ranges)-1].end)
}
done, err := local.pdCtl.PauseSchedulersByKeyRange(subCtx, startKey, endKey)
if err != nil {
return errors.Trace(err)
}
defer func() {
cancel()
<-done
}()
}

log.L().Info("start import engine", zap.Stringer("uuid", engineUUID),
zap.Int("ranges", len(ranges)), zap.Int64("count", lfLength), zap.Int64("size", lfTotalSize))
for {
Expand Down
61 changes: 30 additions & 31 deletions br/pkg/lightning/backend/local/localhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"bytes"
"context"
"database/sql"
"regexp"
"runtime"
"sort"
"strings"
Expand Down Expand Up @@ -330,24 +329,18 @@ func (local *local) SplitAndScatterRegionByRanges(
}

startTime := time.Now()
scatterCount := 0
for _, region := range scatterRegions {
local.waitForScatterRegion(ctx, region)
if time.Since(startTime) > split.ScatterWaitUpperInterval {
break
}
scatterCount++
}
scatterCount, err := local.waitForScatterRegions(ctx, scatterRegions)
if scatterCount == len(scatterRegions) {
log.L().Info("waiting for scattering regions done",
zap.Int("skipped_keys", skippedKeys),
zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime)))
} else {
log.L().Info("waiting for scattering regions timeout",
log.L().Info("waiting for scattering regions partially finished",
zap.Int("skipped_keys", skippedKeys),
zap.Int("scatterCount", scatterCount),
zap.Int("regions", len(scatterRegions)),
zap.Duration("take", time.Since(startTime)))
zap.Duration("take", time.Since(startTime)),
zap.Error(err))
}
return nil
Comment on lines +345 to 347
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to handle this error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emm, it's hard to say. Until now, we didn't treat scatter region error as a critical error.

}
Expand Down Expand Up @@ -445,28 +438,38 @@ func (local *local) waitForSplit(ctx context.Context, regionID uint64) {
}
}

func (local *local) waitForScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) {
for i := 0; i < split.ScatterWaitMaxRetryTimes; i++ {
ok, err := local.checkScatterRegionFinishedOrReScatter(ctx, regionInfo)
if ok {
return
}
if err != nil {
if !common.IsRetryableError(err) {
log.L().Warn("wait for scatter region encountered non-retryable error", logutil.Region(regionInfo.Region), zap.Error(err))
return
func (local *local) waitForScatterRegions(ctx context.Context, regions []*split.RegionInfo) (scatterCount int, _ error) {
subCtx, cancel := context.WithTimeout(ctx, split.ScatterWaitUpperInterval)
defer cancel()

for len(regions) > 0 {
var retryRegions []*split.RegionInfo
for _, region := range regions {
scattered, err := local.checkRegionScatteredOrReScatter(subCtx, region)
if scattered {
scatterCount++
continue
}
log.L().Warn("wait for scatter region encountered error, will retry again", logutil.Region(regionInfo.Region), zap.Error(err))
if err != nil {
if !common.IsRetryableError(err) {
log.L().Warn("wait for scatter region encountered non-retryable error", logutil.Region(region.Region), zap.Error(err))
return scatterCount, err
}
log.L().Warn("wait for scatter region encountered error, will retry again", logutil.Region(region.Region), zap.Error(err))
}
retryRegions = append(retryRegions, region)
}
regions = retryRegions
select {
case <-time.After(time.Second):
case <-ctx.Done():
case <-subCtx.Done():
return
}
}
return scatterCount, nil
}

func (local *local) checkScatterRegionFinishedOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) {
func (local *local) checkRegionScatteredOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) {
resp, err := local.splitCli.GetOperator(ctx, regionInfo.Region.GetId())
if err != nil {
return false, err
Expand All @@ -476,13 +479,9 @@ func (local *local) checkScatterRegionFinishedOrReScatter(ctx context.Context, r
if respErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND {
return true, nil
}
// don't return error if region replicate not complete
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to handle this error now, do we?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is moved to retriable errors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, it's caught by the caller.

// TODO: should add a new error type to avoid this check by string matching
matches, _ := regexp.MatchString("region \\d+ is not fully replicated", respErr.Message)
if matches {
return false, nil
}
return false, errors.Errorf("get operator error: %s", respErr.GetType())
return false, errors.Errorf(
"failed to get region operator, error type: %s, error message: %s",
respErr.GetType().String(), respErr.GetMessage())
}
// If the current operator of the region is not 'scatter-region', we could assume
// that 'scatter-operator' has finished.
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/lightning/common/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"io"
"net"
"os"
"regexp"
"syscall"

"github.com/go-sql-driver/mysql"
Expand All @@ -30,6 +31,8 @@ import (
"google.golang.org/grpc/status"
)

var regionNotFullyReplicatedRe = regexp.MustCompile(`region \d+ is not fully replicated`)

// IsRetryableError returns whether the error is transient (e.g. network
// connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This
// function returns `false` (irrecoverable) if `err == nil`.
Expand Down Expand Up @@ -88,6 +91,9 @@ func isSingleRetryableError(err error) bool {
}
return false
default:
if regionNotFullyReplicatedRe.MatchString(err.Error()) {
return true
}
switch status.Code(err) {
case codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss:
return true
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/common/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,6 @@ func TestIsRetryableError(t *testing.T) {
require.False(t, IsRetryableError(multierr.Combine(context.Canceled, context.Canceled)))
require.True(t, IsRetryableError(multierr.Combine(&net.DNSError{IsTimeout: true}, &net.DNSError{IsTimeout: true})))
require.False(t, IsRetryableError(multierr.Combine(context.Canceled, &net.DNSError{IsTimeout: true})))

require.True(t, IsRetryableError(errors.Errorf("region %d is not fully replicated", 1234)))
}
14 changes: 14 additions & 0 deletions br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,8 @@ type taskMetaMgr interface {
// need to update or any new tasks. There is at most one lightning who can execute the action function at the same time.
// Note that action may be executed multiple times due to transaction retry, caller should make sure it's idempotent.
CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error
// CanPauseSchedulerByKeyRange returns whether the scheduler can pause by the key range.
CanPauseSchedulerByKeyRange() bool
CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error)
// CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata
// Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal)
Expand Down Expand Up @@ -817,6 +819,10 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U
}, nil
}

func (m *dbTaskMetaMgr) CanPauseSchedulerByKeyRange() bool {
return m.pd.CanPauseSchedulerByKeyRange()
}

// CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata
// Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal)
// the second boolean indicates whether to clean up the metadata in tidb
Expand Down Expand Up @@ -1002,6 +1008,10 @@ func (m noopTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.
}, nil
}

func (m noopTaskMetaMgr) CanPauseSchedulerByKeyRange() bool {
return false
}

func (m noopTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) {
return true, nil
}
Expand Down Expand Up @@ -1106,6 +1116,10 @@ func (m *singleTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdut
return m.pd.RemoveSchedulers(ctx)
}

func (m *singleTaskMetaMgr) CanPauseSchedulerByKeyRange() bool {
return m.pd.CanPauseSchedulerByKeyRange()
}

func (m *singleTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) {
return m.initialized, nil
}
Expand Down
9 changes: 6 additions & 3 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1355,14 +1355,17 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
// make split region and ingest sst more stable
// because importer backend is mostly use for v3.x cluster which doesn't support these api,
// so we also don't do this for import backend
finishSchedulers := func() {}
finishSchedulers := func() {
if rc.taskMgr != nil {
rc.taskMgr.Close()
}
}
// if one lightning failed abnormally, and can't determine whether it needs to switch back,
// we do not do switch back automatically
switchBack := false
cleanup := false
postProgress := func() error { return nil }
if rc.cfg.TikvImporter.Backend == config.BackendLocal {

if rc.cfg.TikvImporter.Backend == config.BackendLocal && !rc.taskMgr.CanPauseSchedulerByKeyRange() {
logTask.Info("removing PD leader&region schedulers")

restoreFn, err := rc.taskMgr.CheckAndPausePdSchedulers(ctx)
Expand Down
126 changes: 124 additions & 2 deletions br/pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"bytes"
"context"
"crypto/tls"
"encoding/hex"
"encoding/json"
"fmt"
"io"
Expand All @@ -17,6 +18,7 @@ import (

"github.com/coreos/go-semver/semver"
"github.com/docker/go-units"
"github.com/google/uuid"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand All @@ -36,6 +38,7 @@ const (
regionCountPrefix = "pd/api/v1/stats/region"
storePrefix = "pd/api/v1/store"
schedulerPrefix = "pd/api/v1/schedulers"
regionLabelPrefix = "pd/api/v1/config/region-label/rule"
maxMsgSize = int(128 * units.MiB) // pd.ScanRegion may return a large response
scheduleConfigPrefix = "pd/api/v1/config/schedule"
configPrefix = "pd/api/v1/config"
Expand Down Expand Up @@ -94,6 +97,9 @@ var (
// see https://github.com/tikv/pd/pull/3088
pauseConfigVersion = semver.Version{Major: 4, Minor: 0, Patch: 8}

// After v6.1.0 version, we can pause schedulers by key range with TTL.
minVersionForRegionLabelTTL = semver.Version{Major: 6, Minor: 1, Patch: 0}

// Schedulers represent region/leader schedulers which can impact on performance.
Schedulers = map[string]struct{}{
"balance-leader-scheduler": {},
Expand Down Expand Up @@ -130,9 +136,9 @@ var (
)

// pdHTTPRequest defines the interface to send a request to pd and return the result in bytes.
type pdHTTPRequest func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error)
type pdHTTPRequest func(ctx context.Context, addr string, prefix string, cli *http.Client, method string, body io.Reader) ([]byte, error)

// pdRequest is a func to send a HTTP to pd and return the result bytes.
// pdRequest is a func to send an HTTP to pd and return the result bytes.
func pdRequest(
ctx context.Context,
addr string, prefix string,
Expand Down Expand Up @@ -709,6 +715,122 @@ func (p *PdController) doRemoveSchedulersWith(
return removedSchedulers, err
}

// RegionLabel is the label of a region. This struct is partially copied from
// https://github.com/tikv/pd/blob/783d060861cef37c38cbdcab9777fe95c17907fe/server/schedule/labeler/rules.go#L31.
type RegionLabel struct {
Key string `json:"key"`
Value string `json:"value"`
TTL string `json:"ttl,omitempty"`
StartAt string `json:"start_at,omitempty"`
}

// LabelRule is the rule to assign labels to a region. This struct is partially copied from
// https://github.com/tikv/pd/blob/783d060861cef37c38cbdcab9777fe95c17907fe/server/schedule/labeler/rules.go#L41.
type LabelRule struct {
ID string `json:"id"`
Labels []RegionLabel `json:"labels"`
RuleType string `json:"rule_type"`
Data interface{} `json:"data"`
}

// KeyRangeRule contains the start key and end key of the LabelRule. This struct is partially copied from
// https://github.com/tikv/pd/blob/783d060861cef37c38cbdcab9777fe95c17907fe/server/schedule/labeler/rules.go#L62.
type KeyRangeRule struct {
StartKeyHex string `json:"start_key"` // hex format start key, for marshal/unmarshal
EndKeyHex string `json:"end_key"` // hex format end key, for marshal/unmarshal
}

// CreateOrUpdateRegionLabelRule creates or updates a region label rule.
func (p *PdController) CreateOrUpdateRegionLabelRule(ctx context.Context, rule LabelRule) error {
sleepymole marked this conversation as resolved.
Show resolved Hide resolved
reqData, err := json.Marshal(&rule)
if err != nil {
panic(err)
}
var lastErr error
for i, addr := range p.addrs {
_, lastErr = pdRequest(ctx, addr, regionLabelPrefix,
p.cli, http.MethodPost, bytes.NewBuffer(reqData))
if lastErr == nil {
return nil
}
if err := errors.Cause(err); err == context.Canceled || err == context.DeadlineExceeded {
return errors.Trace(err)
}

if i < len(p.addrs) {
log.Warn("failed to create or update region label rule, will try next pd address",
zap.Error(lastErr), zap.String("pdAddr", addr))
}
}
return errors.Trace(lastErr)
}

// PauseSchedulersByKeyRange will pause schedulers for regions in the specific key range.
// This function will spawn a goroutine to keep pausing schedulers periodically until the context is done.
// The return done channel is used to notify the caller that the background goroutine is exited.
func (p *PdController) PauseSchedulersByKeyRange(ctx context.Context, startKey, endKey []byte) (done <-chan struct{}, err error) {
return p.pauseSchedulerByKeyRangeWithTTL(ctx, startKey, endKey, pauseTimeout)
}

func (p *PdController) pauseSchedulerByKeyRangeWithTTL(ctx context.Context, startKey, endKey []byte, ttl time.Duration) (_done <-chan struct{}, err error) {
rule := LabelRule{
ID: uuid.New().String(),
Labels: []RegionLabel{{
Key: "schedule",
Value: "deny",
TTL: ttl.String(),
}},
RuleType: "key-range",
// Data should be a list of KeyRangeRule when rule type is key-range.
// See https://github.com/tikv/pd/blob/783d060861cef37c38cbdcab9777fe95c17907fe/server/schedule/labeler/rules.go#L169.
Data: []KeyRangeRule{{
StartKeyHex: hex.EncodeToString(startKey),
EndKeyHex: hex.EncodeToString(endKey),
}},
}
done := make(chan struct{})
if err := p.CreateOrUpdateRegionLabelRule(ctx, rule); err != nil {
close(done)
return nil, errors.Trace(err)
}

go func() {
defer close(done)
ticker := time.NewTicker(ttl / 3)
defer ticker.Stop()
loop:
for {
select {
case <-ticker.C:
if err := p.CreateOrUpdateRegionLabelRule(ctx, rule); err != nil {
if err := errors.Cause(err); err == context.Canceled || err == context.DeadlineExceeded {
sleepymole marked this conversation as resolved.
Show resolved Hide resolved
break loop
}
log.Warn("pause scheduler by key range failed, ignore it and wait next time pause", zap.Error(err))
}
case <-ctx.Done():
break loop
}
}
// Use a new context to avoid the context is canceled by the caller.
recoverCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
// Set ttl to 0 to remove the rule.
rule.Labels[0].TTL = time.Duration(0).String()
if err := p.CreateOrUpdateRegionLabelRule(recoverCtx, rule); err != nil {
log.Warn("failed to remove region label rule, the rule will be removed after ttl expires",
zap.String("rule-id", rule.ID), zap.Duration("ttl", ttl), zap.Error(err))
}
}()
return done, nil
}

// CanPauseSchedulerByKeyRange returns whether the scheduler can be paused by key range.
func (p *PdController) CanPauseSchedulerByKeyRange() bool {
// We need ttl feature to ensure scheduler can recover from pause automatically.
return p.version.Compare(minVersionForRegionLabelTTL) >= 0
}

// Close close the connection to pd.
func (p *PdController) Close() {
p.pdClient.Close()
Expand Down
Loading