From 080cba1252944d3b43f5ae8cce578ebbbc1ba451 Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Mon, 6 Jun 2022 14:55:29 +0800 Subject: [PATCH] lightning: support disable scheduler by key range --- br/pkg/lightning/backend/local/local.go | 21 +++ br/pkg/lightning/backend/local/localhelper.go | 57 ++++---- br/pkg/lightning/common/retry.go | 6 + br/pkg/lightning/common/retry_test.go | 2 + br/pkg/lightning/config/config.go | 1 + br/pkg/lightning/restore/meta_manager.go | 14 ++ br/pkg/lightning/restore/restore.go | 9 +- br/pkg/pdutil/pd.go | 125 +++++++++++++++++- br/pkg/pdutil/pd_serial_test.go | 39 ++++++ 9 files changed, 239 insertions(+), 35 deletions(-) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 0d19c6887c4f8..a635a8c3e3470 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1357,6 +1357,27 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi return err } + if len(ranges) > 0 && local.pdCtl.CanPauseSchedulerByKeyRange() { + subCtx, cancel := context.WithCancel(ctx) + defer cancel() + + var startKey, endKey []byte + if len(ranges[0].start) > 0 { + startKey = codec.EncodeBytes(nil, ranges[0].start) + } + if len(ranges[len(ranges)-1].end) > 0 { + endKey = codec.EncodeBytes(nil, ranges[len(ranges)-1].end) + } + done, err := local.pdCtl.PauseSchedulersByKeyRange(subCtx, startKey, endKey) + if err != nil { + return errors.Trace(err) + } + defer func() { + cancel() + <-done + }() + } + log.L().Info("start import engine", zap.Stringer("uuid", engineUUID), zap.Int("ranges", len(ranges)), zap.Int64("count", lfLength), zap.Int64("size", lfTotalSize)) for { diff --git a/br/pkg/lightning/backend/local/localhelper.go b/br/pkg/lightning/backend/local/localhelper.go index 98413b20e71e0..57f4aa1f979a3 100644 --- a/br/pkg/lightning/backend/local/localhelper.go +++ b/br/pkg/lightning/backend/local/localhelper.go @@ -18,7 +18,6 @@ import ( "bytes" "context" "database/sql" - "regexp" "runtime" "sort" "strings" @@ -330,24 +329,18 @@ func (local *local) SplitAndScatterRegionByRanges( } startTime := time.Now() - scatterCount := 0 - for _, region := range scatterRegions { - local.waitForScatterRegion(ctx, region) - if time.Since(startTime) > split.ScatterWaitUpperInterval { - break - } - scatterCount++ - } + scatterCount, err := local.waitForScatterRegions(ctx, scatterRegions) if scatterCount == len(scatterRegions) { log.L().Info("waiting for scattering regions done", zap.Int("skipped_keys", skippedKeys), zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime))) } else { - log.L().Info("waiting for scattering regions timeout", + log.L().Info("waiting for scattering regions partially finished", zap.Int("skipped_keys", skippedKeys), zap.Int("scatterCount", scatterCount), zap.Int("regions", len(scatterRegions)), - zap.Duration("take", time.Since(startTime))) + zap.Duration("take", time.Since(startTime)), + zap.Error(err)) } return nil } @@ -445,28 +438,38 @@ func (local *local) waitForSplit(ctx context.Context, regionID uint64) { } } -func (local *local) waitForScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) { - for i := 0; i < split.ScatterWaitMaxRetryTimes; i++ { - ok, err := local.checkScatterRegionFinishedOrReScatter(ctx, regionInfo) - if ok { - return - } - if err != nil { - if !common.IsRetryableError(err) { - log.L().Warn("wait for scatter region encountered non-retryable error", logutil.Region(regionInfo.Region), zap.Error(err)) - return +func (local *local) waitForScatterRegions(ctx context.Context, regions []*split.RegionInfo) (scatterCount int, _ error) { + subCtx, cancel := context.WithTimeout(ctx, split.ScatterWaitUpperInterval) + defer cancel() + + for len(regions) > 0 { + var retryRegions []*split.RegionInfo + for _, region := range regions { + scattered, err := local.checkRegionScatteredOrReScatter(subCtx, region) + if scattered { + scatterCount++ + continue } - log.L().Warn("wait for scatter region encountered error, will retry again", logutil.Region(regionInfo.Region), zap.Error(err)) + if err != nil { + if !common.IsRetryableError(err) { + log.L().Warn("wait for scatter region encountered non-retryable error", logutil.Region(region.Region), zap.Error(err)) + return scatterCount, err + } + log.L().Warn("wait for scatter region encountered error, will retry again", logutil.Region(region.Region), zap.Error(err)) + } + retryRegions = append(retryRegions, region) } + regions = retryRegions select { case <-time.After(time.Second): - case <-ctx.Done(): + case <-subCtx.Done(): return } } + return scatterCount, nil } -func (local *local) checkScatterRegionFinishedOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) { +func (local *local) checkRegionScatteredOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) { resp, err := local.splitCli.GetOperator(ctx, regionInfo.Region.GetId()) if err != nil { return false, err @@ -476,12 +479,6 @@ func (local *local) checkScatterRegionFinishedOrReScatter(ctx context.Context, r if respErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND { return true, nil } - // don't return error if region replicate not complete - // TODO: should add a new error type to avoid this check by string matching - matches, _ := regexp.MatchString("region \\d+ is not fully replicated", respErr.Message) - if matches { - return false, nil - } return false, errors.Errorf("get operator error: %s", respErr.GetType()) } // If the current operator of the region is not 'scatter-region', we could assume diff --git a/br/pkg/lightning/common/retry.go b/br/pkg/lightning/common/retry.go index accf7423414b4..3a7757d650738 100644 --- a/br/pkg/lightning/common/retry.go +++ b/br/pkg/lightning/common/retry.go @@ -20,6 +20,7 @@ import ( "io" "net" "os" + "regexp" "syscall" "github.com/go-sql-driver/mysql" @@ -30,6 +31,8 @@ import ( "google.golang.org/grpc/status" ) +var regionNotFullyReplicatedRe = regexp.MustCompile(`region \d+ is not fully replicated`) + // IsRetryableError returns whether the error is transient (e.g. network // connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This // function returns `false` (irrecoverable) if `err == nil`. @@ -88,6 +91,9 @@ func isSingleRetryableError(err error) bool { } return false default: + if regionNotFullyReplicatedRe.MatchString(err.Error()) { + return true + } switch status.Code(err) { case codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss: return true diff --git a/br/pkg/lightning/common/retry_test.go b/br/pkg/lightning/common/retry_test.go index 670004260f5a1..b707a8d1d1c5f 100644 --- a/br/pkg/lightning/common/retry_test.go +++ b/br/pkg/lightning/common/retry_test.go @@ -94,4 +94,6 @@ func TestIsRetryableError(t *testing.T) { require.False(t, IsRetryableError(multierr.Combine(context.Canceled, context.Canceled))) require.True(t, IsRetryableError(multierr.Combine(&net.DNSError{IsTimeout: true}, &net.DNSError{IsTimeout: true}))) require.False(t, IsRetryableError(multierr.Combine(context.Canceled, &net.DNSError{IsTimeout: true}))) + + require.True(t, IsRetryableError(errors.Errorf("region %d is not fully replicated", 1234))) } diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index fee2aaf29deb2..b8caa0e09e888 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -532,6 +532,7 @@ type TikvImporter struct { EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"` LocalWriterMemCacheSize ByteSize `toml:"local-writer-mem-cache-size" json:"local-writer-mem-cache-size"` + MaxKVWriteBytesPerSec ByteSize `toml:"max-kv-write-bytes-per-sec" json:"max-kv-write-bytes-per-sec"` } type Checkpoint struct { diff --git a/br/pkg/lightning/restore/meta_manager.go b/br/pkg/lightning/restore/meta_manager.go index 8eace8c5f979d..42c6e3804c0ba 100644 --- a/br/pkg/lightning/restore/meta_manager.go +++ b/br/pkg/lightning/restore/meta_manager.go @@ -505,6 +505,8 @@ type taskMetaMgr interface { // need to update or any new tasks. There is at most one lightning who can execute the action function at the same time. // Note that action may be executed multiple times due to transaction retry, caller should make sure it's idempotent. CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error + // CanPauseSchedulerByKeyRange returns whether the scheduler can pause by the key range. + CanPauseSchedulerByKeyRange() bool CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error) // CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata // Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal) @@ -817,6 +819,10 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U }, nil } +func (m *dbTaskMetaMgr) CanPauseSchedulerByKeyRange() bool { + return m.pd.CanPauseSchedulerByKeyRange() +} + // CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata // Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal) // the second boolean indicates whether to clean up the metadata in tidb @@ -1002,6 +1008,10 @@ func (m noopTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil. }, nil } +func (m noopTaskMetaMgr) CanPauseSchedulerByKeyRange() bool { + return false +} + func (m noopTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) { return true, nil } @@ -1106,6 +1116,10 @@ func (m *singleTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdut return m.pd.RemoveSchedulers(ctx) } +func (m *singleTaskMetaMgr) CanPauseSchedulerByKeyRange() bool { + return m.pd.CanPauseSchedulerByKeyRange() +} + func (m *singleTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) { return m.initialized, nil } diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index c776510ae3c9d..eba34f37c7c9f 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -1355,14 +1355,17 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) { // make split region and ingest sst more stable // because importer backend is mostly use for v3.x cluster which doesn't support these api, // so we also don't do this for import backend - finishSchedulers := func() {} + finishSchedulers := func() { + if rc.taskMgr != nil { + rc.taskMgr.Close() + } + } // if one lightning failed abnormally, and can't determine whether it needs to switch back, // we do not do switch back automatically switchBack := false cleanup := false postProgress := func() error { return nil } - if rc.cfg.TikvImporter.Backend == config.BackendLocal { - + if rc.cfg.TikvImporter.Backend == config.BackendLocal && !rc.taskMgr.CanPauseSchedulerByKeyRange() { logTask.Info("removing PD leader®ion schedulers") restoreFn, err := rc.taskMgr.CheckAndPausePdSchedulers(ctx) diff --git a/br/pkg/pdutil/pd.go b/br/pkg/pdutil/pd.go index 599eefe30afb5..933141944b998 100644 --- a/br/pkg/pdutil/pd.go +++ b/br/pkg/pdutil/pd.go @@ -6,6 +6,7 @@ import ( "bytes" "context" "crypto/tls" + "encoding/hex" "encoding/json" "fmt" "io" @@ -17,6 +18,7 @@ import ( "github.com/coreos/go-semver/semver" "github.com/docker/go-units" + "github.com/google/uuid" "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -36,6 +38,7 @@ const ( regionCountPrefix = "pd/api/v1/stats/region" storePrefix = "pd/api/v1/store" schedulerPrefix = "pd/api/v1/schedulers" + regionLabelPrefix = "pd/api/v1/config/region-label/rule" maxMsgSize = int(128 * units.MiB) // pd.ScanRegion may return a large response scheduleConfigPrefix = "pd/api/v1/config/schedule" configPrefix = "pd/api/v1/config" @@ -94,6 +97,9 @@ var ( // see https://github.com/tikv/pd/pull/3088 pauseConfigVersion = semver.Version{Major: 4, Minor: 0, Patch: 8} + // After v6.1.0 version, we can pause schedulers by key range with TTL. + minVersionForRegionLabelTTL = semver.Version{Major: 6, Minor: 1, Patch: 0} + // Schedulers represent region/leader schedulers which can impact on performance. Schedulers = map[string]struct{}{ "balance-leader-scheduler": {}, @@ -130,9 +136,9 @@ var ( ) // pdHTTPRequest defines the interface to send a request to pd and return the result in bytes. -type pdHTTPRequest func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) +type pdHTTPRequest func(ctx context.Context, addr string, prefix string, cli *http.Client, method string, body io.Reader) ([]byte, error) -// pdRequest is a func to send a HTTP to pd and return the result bytes. +// pdRequest is a func to send an HTTP to pd and return the result bytes. func pdRequest( ctx context.Context, addr string, prefix string, @@ -709,6 +715,121 @@ func (p *PdController) doRemoveSchedulersWith( return removedSchedulers, err } +// RegionLabel is the label of a region. This struct is partially copied from +// https://github.com/tikv/pd/blob/783d060861cef37c38cbdcab9777fe95c17907fe/server/schedule/labeler/rules.go#L31. +type RegionLabel struct { + Key string `json:"key"` + Value string `json:"value"` + TTL string `json:"ttl,omitempty"` + StartAt string `json:"start_at,omitempty"` +} + +// LabelRule is the rule to assign labels to a region. This struct is partially copied from +// https://github.com/tikv/pd/blob/783d060861cef37c38cbdcab9777fe95c17907fe/server/schedule/labeler/rules.go#L41. +type LabelRule struct { + ID string `json:"id"` + Labels []RegionLabel `json:"labels"` + RuleType string `json:"rule_type"` + Data interface{} `json:"data"` +} + +// KeyRangeRule contains the start key and end key of the LabelRule. This struct is partially copied from +// https://github.com/tikv/pd/blob/783d060861cef37c38cbdcab9777fe95c17907fe/server/schedule/labeler/rules.go#L62. +type KeyRangeRule struct { + StartKeyHex string `json:"start_key"` // hex format start key, for marshal/unmarshal + EndKeyHex string `json:"end_key"` // hex format end key, for marshal/unmarshal +} + +// CreateOrUpdateRegionLabelRule creates or updates a region label rule. +func (p *PdController) CreateOrUpdateRegionLabelRule(ctx context.Context, rule LabelRule) error { + reqData, err := json.Marshal(&rule) + if err != nil { + panic(err) + } + var lastErr error + for i, addr := range p.addrs { + _, lastErr = pdRequest(ctx, addr, regionLabelPrefix, + p.cli, http.MethodPost, bytes.NewBuffer(reqData)) + if lastErr == nil { + return nil + } + if ctx.Err() != nil { + return errors.Cause(ctx.Err()) + } + if i < len(p.addrs) { + log.Warn("failed to create or update region label rule, will try next pd address", + zap.Error(lastErr), zap.String("pdAddr", addr)) + } + } + return errors.Trace(lastErr) +} + +// PauseSchedulersByKeyRange will pause schedulers for regions in the specific key range. +// This function will spawn a goroutine to keep pausing schedulers periodically until the context is done. +// The return done channel is used to notify the caller that the background goroutine is exited. +func (p *PdController) PauseSchedulersByKeyRange(ctx context.Context, startKey, endKey []byte) (done <-chan struct{}, err error) { + return p.pauseSchedulerByKeyRangeWithTTL(ctx, startKey, endKey, pauseTimeout) +} + +func (p *PdController) pauseSchedulerByKeyRangeWithTTL(ctx context.Context, startKey, endKey []byte, ttl time.Duration) (_done <-chan struct{}, err error) { + rule := LabelRule{ + ID: uuid.New().String(), + Labels: []RegionLabel{{ + Key: "schedule", + Value: "deny", + TTL: ttl.String(), + }}, + RuleType: "key-range", + // Data should be a list of KeyRangeRule when rule type is key-range. + // See https://github.com/tikv/pd/blob/783d060861cef37c38cbdcab9777fe95c17907fe/server/schedule/labeler/rules.go#L169. + Data: []KeyRangeRule{{ + StartKeyHex: hex.EncodeToString(startKey), + EndKeyHex: hex.EncodeToString(endKey), + }}, + } + done := make(chan struct{}) + if err := p.CreateOrUpdateRegionLabelRule(ctx, rule); err != nil { + close(done) + return nil, errors.Trace(err) + } + + go func() { + defer close(done) + ticker := time.NewTicker(ttl / 3) + defer ticker.Stop() + loop: + for { + select { + case <-ticker.C: + if err := p.CreateOrUpdateRegionLabelRule(ctx, rule); err != nil { + if ctx.Err() != nil { + break loop + } + log.Warn("pause scheduler by key range failed, ignore it and wait next time pause", zap.Error(err)) + } + case <-ctx.Done(): + break loop + } + } + // Use a new context to avoid the context is canceled by the caller. + recoverCtx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + // Set ttl to 0 to remove the rule. + rule.Labels[0].TTL = time.Duration(0).String() + if err := p.CreateOrUpdateRegionLabelRule(recoverCtx, rule); err != nil { + log.Warn("failed to remove region label rule, the rule will be removed after ttl expires", + zap.String("rule-id", rule.ID), zap.Duration("ttl", ttl), zap.Error(err)) + } + }() + return done, nil +} + +// CanPauseSchedulerByKeyRange returns whether the scheduler can be paused by key range. +func (p *PdController) CanPauseSchedulerByKeyRange() bool { + // We need ttl feature to ensure scheduler can recover from pause automatically. + return p.version.Compare(minVersionForRegionLabelTTL) >= 0 +} + // Close close the connection to pd. func (p *PdController) Close() { p.pdClient.Close() diff --git a/br/pkg/pdutil/pd_serial_test.go b/br/pkg/pdutil/pd_serial_test.go index 05f0d34aa2ef2..73eb4048b278e 100644 --- a/br/pkg/pdutil/pd_serial_test.go +++ b/br/pkg/pdutil/pd_serial_test.go @@ -13,6 +13,7 @@ import ( "net/http/httptest" "net/url" "testing" + "time" "github.com/coreos/go-semver/semver" "github.com/pingcap/failpoint" @@ -231,3 +232,41 @@ func TestStoreInfo(t *testing.T) { require.Equal(t, "Tombstone", resp.Store.StateName) require.Equal(t, uint64(1024), uint64(resp.Status.Available)) } + +func TestPauseSchedulersByKeyRange(t *testing.T) { + const ttl = time.Second + + labelExpires := make(map[string]time.Time) + + httpSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var labelRule LabelRule + err := json.NewDecoder(r.Body).Decode(&labelRule) + require.NoError(t, err) + require.Len(t, labelRule.Labels, 1) + regionLabel := labelRule.Labels[0] + require.Equal(t, "schedule", regionLabel.Key) + require.Equal(t, "deny", regionLabel.Value) + reqTTL, err := time.ParseDuration(regionLabel.TTL) + require.NoError(t, err) + if reqTTL == 0 { + delete(labelExpires, labelRule.ID) + } else { + require.Equal(t, ttl, reqTTL) + if expire, ok := labelExpires[labelRule.ID]; ok { + require.True(t, expire.After(time.Now()), "should not expire before now") + } + labelExpires[labelRule.ID] = time.Now().Add(ttl) + } + })) + defer httpSrv.Close() + + pdController := &PdController{addrs: []string{httpSrv.URL}, cli: http.DefaultClient} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + done, err := pdController.pauseSchedulerByKeyRangeWithTTL(ctx, []byte{0, 0, 0, 0}, []byte{0xff, 0xff, 0xff, 0xff}, ttl) + require.NoError(t, err) + time.Sleep(ttl * 3) + cancel() + <-done + require.Len(t, labelExpires, 0) +}