From 080cba1252944d3b43f5ae8cce578ebbbc1ba451 Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Mon, 6 Jun 2022 14:55:29 +0800 Subject: [PATCH 01/14] lightning: support disable scheduler by key range --- br/pkg/lightning/backend/local/local.go | 21 +++ br/pkg/lightning/backend/local/localhelper.go | 57 ++++---- br/pkg/lightning/common/retry.go | 6 + br/pkg/lightning/common/retry_test.go | 2 + br/pkg/lightning/config/config.go | 1 + br/pkg/lightning/restore/meta_manager.go | 14 ++ br/pkg/lightning/restore/restore.go | 9 +- br/pkg/pdutil/pd.go | 125 +++++++++++++++++- br/pkg/pdutil/pd_serial_test.go | 39 ++++++ 9 files changed, 239 insertions(+), 35 deletions(-) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 0d19c6887c4f8..a635a8c3e3470 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1357,6 +1357,27 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi return err } + if len(ranges) > 0 && local.pdCtl.CanPauseSchedulerByKeyRange() { + subCtx, cancel := context.WithCancel(ctx) + defer cancel() + + var startKey, endKey []byte + if len(ranges[0].start) > 0 { + startKey = codec.EncodeBytes(nil, ranges[0].start) + } + if len(ranges[len(ranges)-1].end) > 0 { + endKey = codec.EncodeBytes(nil, ranges[len(ranges)-1].end) + } + done, err := local.pdCtl.PauseSchedulersByKeyRange(subCtx, startKey, endKey) + if err != nil { + return errors.Trace(err) + } + defer func() { + cancel() + <-done + }() + } + log.L().Info("start import engine", zap.Stringer("uuid", engineUUID), zap.Int("ranges", len(ranges)), zap.Int64("count", lfLength), zap.Int64("size", lfTotalSize)) for { diff --git a/br/pkg/lightning/backend/local/localhelper.go b/br/pkg/lightning/backend/local/localhelper.go index 98413b20e71e0..57f4aa1f979a3 100644 --- a/br/pkg/lightning/backend/local/localhelper.go +++ b/br/pkg/lightning/backend/local/localhelper.go @@ -18,7 +18,6 @@ import ( "bytes" "context" "database/sql" - "regexp" "runtime" "sort" "strings" @@ -330,24 +329,18 @@ func (local *local) SplitAndScatterRegionByRanges( } startTime := time.Now() - scatterCount := 0 - for _, region := range scatterRegions { - local.waitForScatterRegion(ctx, region) - if time.Since(startTime) > split.ScatterWaitUpperInterval { - break - } - scatterCount++ - } + scatterCount, err := local.waitForScatterRegions(ctx, scatterRegions) if scatterCount == len(scatterRegions) { log.L().Info("waiting for scattering regions done", zap.Int("skipped_keys", skippedKeys), zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime))) } else { - log.L().Info("waiting for scattering regions timeout", + log.L().Info("waiting for scattering regions partially finished", zap.Int("skipped_keys", skippedKeys), zap.Int("scatterCount", scatterCount), zap.Int("regions", len(scatterRegions)), - zap.Duration("take", time.Since(startTime))) + zap.Duration("take", time.Since(startTime)), + zap.Error(err)) } return nil } @@ -445,28 +438,38 @@ func (local *local) waitForSplit(ctx context.Context, regionID uint64) { } } -func (local *local) waitForScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) { - for i := 0; i < split.ScatterWaitMaxRetryTimes; i++ { - ok, err := local.checkScatterRegionFinishedOrReScatter(ctx, regionInfo) - if ok { - return - } - if err != nil { - if !common.IsRetryableError(err) { - log.L().Warn("wait for scatter region encountered non-retryable error", logutil.Region(regionInfo.Region), zap.Error(err)) - return +func (local *local) waitForScatterRegions(ctx context.Context, regions []*split.RegionInfo) (scatterCount int, _ error) { + subCtx, cancel := context.WithTimeout(ctx, split.ScatterWaitUpperInterval) + defer cancel() + + for len(regions) > 0 { + var retryRegions []*split.RegionInfo + for _, region := range regions { + scattered, err := local.checkRegionScatteredOrReScatter(subCtx, region) + if scattered { + scatterCount++ + continue } - log.L().Warn("wait for scatter region encountered error, will retry again", logutil.Region(regionInfo.Region), zap.Error(err)) + if err != nil { + if !common.IsRetryableError(err) { + log.L().Warn("wait for scatter region encountered non-retryable error", logutil.Region(region.Region), zap.Error(err)) + return scatterCount, err + } + log.L().Warn("wait for scatter region encountered error, will retry again", logutil.Region(region.Region), zap.Error(err)) + } + retryRegions = append(retryRegions, region) } + regions = retryRegions select { case <-time.After(time.Second): - case <-ctx.Done(): + case <-subCtx.Done(): return } } + return scatterCount, nil } -func (local *local) checkScatterRegionFinishedOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) { +func (local *local) checkRegionScatteredOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) { resp, err := local.splitCli.GetOperator(ctx, regionInfo.Region.GetId()) if err != nil { return false, err @@ -476,12 +479,6 @@ func (local *local) checkScatterRegionFinishedOrReScatter(ctx context.Context, r if respErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND { return true, nil } - // don't return error if region replicate not complete - // TODO: should add a new error type to avoid this check by string matching - matches, _ := regexp.MatchString("region \\d+ is not fully replicated", respErr.Message) - if matches { - return false, nil - } return false, errors.Errorf("get operator error: %s", respErr.GetType()) } // If the current operator of the region is not 'scatter-region', we could assume diff --git a/br/pkg/lightning/common/retry.go b/br/pkg/lightning/common/retry.go index accf7423414b4..3a7757d650738 100644 --- a/br/pkg/lightning/common/retry.go +++ b/br/pkg/lightning/common/retry.go @@ -20,6 +20,7 @@ import ( "io" "net" "os" + "regexp" "syscall" "github.com/go-sql-driver/mysql" @@ -30,6 +31,8 @@ import ( "google.golang.org/grpc/status" ) +var regionNotFullyReplicatedRe = regexp.MustCompile(`region \d+ is not fully replicated`) + // IsRetryableError returns whether the error is transient (e.g. network // connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This // function returns `false` (irrecoverable) if `err == nil`. @@ -88,6 +91,9 @@ func isSingleRetryableError(err error) bool { } return false default: + if regionNotFullyReplicatedRe.MatchString(err.Error()) { + return true + } switch status.Code(err) { case codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss: return true diff --git a/br/pkg/lightning/common/retry_test.go b/br/pkg/lightning/common/retry_test.go index 670004260f5a1..b707a8d1d1c5f 100644 --- a/br/pkg/lightning/common/retry_test.go +++ b/br/pkg/lightning/common/retry_test.go @@ -94,4 +94,6 @@ func TestIsRetryableError(t *testing.T) { require.False(t, IsRetryableError(multierr.Combine(context.Canceled, context.Canceled))) require.True(t, IsRetryableError(multierr.Combine(&net.DNSError{IsTimeout: true}, &net.DNSError{IsTimeout: true}))) require.False(t, IsRetryableError(multierr.Combine(context.Canceled, &net.DNSError{IsTimeout: true}))) + + require.True(t, IsRetryableError(errors.Errorf("region %d is not fully replicated", 1234))) } diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index fee2aaf29deb2..b8caa0e09e888 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -532,6 +532,7 @@ type TikvImporter struct { EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"` LocalWriterMemCacheSize ByteSize `toml:"local-writer-mem-cache-size" json:"local-writer-mem-cache-size"` + MaxKVWriteBytesPerSec ByteSize `toml:"max-kv-write-bytes-per-sec" json:"max-kv-write-bytes-per-sec"` } type Checkpoint struct { diff --git a/br/pkg/lightning/restore/meta_manager.go b/br/pkg/lightning/restore/meta_manager.go index 8eace8c5f979d..42c6e3804c0ba 100644 --- a/br/pkg/lightning/restore/meta_manager.go +++ b/br/pkg/lightning/restore/meta_manager.go @@ -505,6 +505,8 @@ type taskMetaMgr interface { // need to update or any new tasks. There is at most one lightning who can execute the action function at the same time. // Note that action may be executed multiple times due to transaction retry, caller should make sure it's idempotent. CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error + // CanPauseSchedulerByKeyRange returns whether the scheduler can pause by the key range. + CanPauseSchedulerByKeyRange() bool CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error) // CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata // Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal) @@ -817,6 +819,10 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U }, nil } +func (m *dbTaskMetaMgr) CanPauseSchedulerByKeyRange() bool { + return m.pd.CanPauseSchedulerByKeyRange() +} + // CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata // Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal) // the second boolean indicates whether to clean up the metadata in tidb @@ -1002,6 +1008,10 @@ func (m noopTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil. }, nil } +func (m noopTaskMetaMgr) CanPauseSchedulerByKeyRange() bool { + return false +} + func (m noopTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) { return true, nil } @@ -1106,6 +1116,10 @@ func (m *singleTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdut return m.pd.RemoveSchedulers(ctx) } +func (m *singleTaskMetaMgr) CanPauseSchedulerByKeyRange() bool { + return m.pd.CanPauseSchedulerByKeyRange() +} + func (m *singleTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) { return m.initialized, nil } diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index c776510ae3c9d..eba34f37c7c9f 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -1355,14 +1355,17 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) { // make split region and ingest sst more stable // because importer backend is mostly use for v3.x cluster which doesn't support these api, // so we also don't do this for import backend - finishSchedulers := func() {} + finishSchedulers := func() { + if rc.taskMgr != nil { + rc.taskMgr.Close() + } + } // if one lightning failed abnormally, and can't determine whether it needs to switch back, // we do not do switch back automatically switchBack := false cleanup := false postProgress := func() error { return nil } - if rc.cfg.TikvImporter.Backend == config.BackendLocal { - + if rc.cfg.TikvImporter.Backend == config.BackendLocal && !rc.taskMgr.CanPauseSchedulerByKeyRange() { logTask.Info("removing PD leader®ion schedulers") restoreFn, err := rc.taskMgr.CheckAndPausePdSchedulers(ctx) diff --git a/br/pkg/pdutil/pd.go b/br/pkg/pdutil/pd.go index 599eefe30afb5..933141944b998 100644 --- a/br/pkg/pdutil/pd.go +++ b/br/pkg/pdutil/pd.go @@ -6,6 +6,7 @@ import ( "bytes" "context" "crypto/tls" + "encoding/hex" "encoding/json" "fmt" "io" @@ -17,6 +18,7 @@ import ( "github.com/coreos/go-semver/semver" "github.com/docker/go-units" + "github.com/google/uuid" "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -36,6 +38,7 @@ const ( regionCountPrefix = "pd/api/v1/stats/region" storePrefix = "pd/api/v1/store" schedulerPrefix = "pd/api/v1/schedulers" + regionLabelPrefix = "pd/api/v1/config/region-label/rule" maxMsgSize = int(128 * units.MiB) // pd.ScanRegion may return a large response scheduleConfigPrefix = "pd/api/v1/config/schedule" configPrefix = "pd/api/v1/config" @@ -94,6 +97,9 @@ var ( // see https://github.com/tikv/pd/pull/3088 pauseConfigVersion = semver.Version{Major: 4, Minor: 0, Patch: 8} + // After v6.1.0 version, we can pause schedulers by key range with TTL. + minVersionForRegionLabelTTL = semver.Version{Major: 6, Minor: 1, Patch: 0} + // Schedulers represent region/leader schedulers which can impact on performance. Schedulers = map[string]struct{}{ "balance-leader-scheduler": {}, @@ -130,9 +136,9 @@ var ( ) // pdHTTPRequest defines the interface to send a request to pd and return the result in bytes. -type pdHTTPRequest func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) +type pdHTTPRequest func(ctx context.Context, addr string, prefix string, cli *http.Client, method string, body io.Reader) ([]byte, error) -// pdRequest is a func to send a HTTP to pd and return the result bytes. +// pdRequest is a func to send an HTTP to pd and return the result bytes. func pdRequest( ctx context.Context, addr string, prefix string, @@ -709,6 +715,121 @@ func (p *PdController) doRemoveSchedulersWith( return removedSchedulers, err } +// RegionLabel is the label of a region. This struct is partially copied from +// https://github.com/tikv/pd/blob/783d060861cef37c38cbdcab9777fe95c17907fe/server/schedule/labeler/rules.go#L31. +type RegionLabel struct { + Key string `json:"key"` + Value string `json:"value"` + TTL string `json:"ttl,omitempty"` + StartAt string `json:"start_at,omitempty"` +} + +// LabelRule is the rule to assign labels to a region. This struct is partially copied from +// https://github.com/tikv/pd/blob/783d060861cef37c38cbdcab9777fe95c17907fe/server/schedule/labeler/rules.go#L41. +type LabelRule struct { + ID string `json:"id"` + Labels []RegionLabel `json:"labels"` + RuleType string `json:"rule_type"` + Data interface{} `json:"data"` +} + +// KeyRangeRule contains the start key and end key of the LabelRule. This struct is partially copied from +// https://github.com/tikv/pd/blob/783d060861cef37c38cbdcab9777fe95c17907fe/server/schedule/labeler/rules.go#L62. +type KeyRangeRule struct { + StartKeyHex string `json:"start_key"` // hex format start key, for marshal/unmarshal + EndKeyHex string `json:"end_key"` // hex format end key, for marshal/unmarshal +} + +// CreateOrUpdateRegionLabelRule creates or updates a region label rule. +func (p *PdController) CreateOrUpdateRegionLabelRule(ctx context.Context, rule LabelRule) error { + reqData, err := json.Marshal(&rule) + if err != nil { + panic(err) + } + var lastErr error + for i, addr := range p.addrs { + _, lastErr = pdRequest(ctx, addr, regionLabelPrefix, + p.cli, http.MethodPost, bytes.NewBuffer(reqData)) + if lastErr == nil { + return nil + } + if ctx.Err() != nil { + return errors.Cause(ctx.Err()) + } + if i < len(p.addrs) { + log.Warn("failed to create or update region label rule, will try next pd address", + zap.Error(lastErr), zap.String("pdAddr", addr)) + } + } + return errors.Trace(lastErr) +} + +// PauseSchedulersByKeyRange will pause schedulers for regions in the specific key range. +// This function will spawn a goroutine to keep pausing schedulers periodically until the context is done. +// The return done channel is used to notify the caller that the background goroutine is exited. +func (p *PdController) PauseSchedulersByKeyRange(ctx context.Context, startKey, endKey []byte) (done <-chan struct{}, err error) { + return p.pauseSchedulerByKeyRangeWithTTL(ctx, startKey, endKey, pauseTimeout) +} + +func (p *PdController) pauseSchedulerByKeyRangeWithTTL(ctx context.Context, startKey, endKey []byte, ttl time.Duration) (_done <-chan struct{}, err error) { + rule := LabelRule{ + ID: uuid.New().String(), + Labels: []RegionLabel{{ + Key: "schedule", + Value: "deny", + TTL: ttl.String(), + }}, + RuleType: "key-range", + // Data should be a list of KeyRangeRule when rule type is key-range. + // See https://github.com/tikv/pd/blob/783d060861cef37c38cbdcab9777fe95c17907fe/server/schedule/labeler/rules.go#L169. + Data: []KeyRangeRule{{ + StartKeyHex: hex.EncodeToString(startKey), + EndKeyHex: hex.EncodeToString(endKey), + }}, + } + done := make(chan struct{}) + if err := p.CreateOrUpdateRegionLabelRule(ctx, rule); err != nil { + close(done) + return nil, errors.Trace(err) + } + + go func() { + defer close(done) + ticker := time.NewTicker(ttl / 3) + defer ticker.Stop() + loop: + for { + select { + case <-ticker.C: + if err := p.CreateOrUpdateRegionLabelRule(ctx, rule); err != nil { + if ctx.Err() != nil { + break loop + } + log.Warn("pause scheduler by key range failed, ignore it and wait next time pause", zap.Error(err)) + } + case <-ctx.Done(): + break loop + } + } + // Use a new context to avoid the context is canceled by the caller. + recoverCtx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + // Set ttl to 0 to remove the rule. + rule.Labels[0].TTL = time.Duration(0).String() + if err := p.CreateOrUpdateRegionLabelRule(recoverCtx, rule); err != nil { + log.Warn("failed to remove region label rule, the rule will be removed after ttl expires", + zap.String("rule-id", rule.ID), zap.Duration("ttl", ttl), zap.Error(err)) + } + }() + return done, nil +} + +// CanPauseSchedulerByKeyRange returns whether the scheduler can be paused by key range. +func (p *PdController) CanPauseSchedulerByKeyRange() bool { + // We need ttl feature to ensure scheduler can recover from pause automatically. + return p.version.Compare(minVersionForRegionLabelTTL) >= 0 +} + // Close close the connection to pd. func (p *PdController) Close() { p.pdClient.Close() diff --git a/br/pkg/pdutil/pd_serial_test.go b/br/pkg/pdutil/pd_serial_test.go index 05f0d34aa2ef2..73eb4048b278e 100644 --- a/br/pkg/pdutil/pd_serial_test.go +++ b/br/pkg/pdutil/pd_serial_test.go @@ -13,6 +13,7 @@ import ( "net/http/httptest" "net/url" "testing" + "time" "github.com/coreos/go-semver/semver" "github.com/pingcap/failpoint" @@ -231,3 +232,41 @@ func TestStoreInfo(t *testing.T) { require.Equal(t, "Tombstone", resp.Store.StateName) require.Equal(t, uint64(1024), uint64(resp.Status.Available)) } + +func TestPauseSchedulersByKeyRange(t *testing.T) { + const ttl = time.Second + + labelExpires := make(map[string]time.Time) + + httpSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var labelRule LabelRule + err := json.NewDecoder(r.Body).Decode(&labelRule) + require.NoError(t, err) + require.Len(t, labelRule.Labels, 1) + regionLabel := labelRule.Labels[0] + require.Equal(t, "schedule", regionLabel.Key) + require.Equal(t, "deny", regionLabel.Value) + reqTTL, err := time.ParseDuration(regionLabel.TTL) + require.NoError(t, err) + if reqTTL == 0 { + delete(labelExpires, labelRule.ID) + } else { + require.Equal(t, ttl, reqTTL) + if expire, ok := labelExpires[labelRule.ID]; ok { + require.True(t, expire.After(time.Now()), "should not expire before now") + } + labelExpires[labelRule.ID] = time.Now().Add(ttl) + } + })) + defer httpSrv.Close() + + pdController := &PdController{addrs: []string{httpSrv.URL}, cli: http.DefaultClient} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + done, err := pdController.pauseSchedulerByKeyRangeWithTTL(ctx, []byte{0, 0, 0, 0}, []byte{0xff, 0xff, 0xff, 0xff}, ttl) + require.NoError(t, err) + time.Sleep(ttl * 3) + cancel() + <-done + require.Len(t, labelExpires, 0) +} From d19965a461b67c6bc8a5f862c70385c237c1e259 Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Tue, 7 Jun 2022 17:33:42 +0800 Subject: [PATCH 02/14] remove unused option --- br/pkg/lightning/config/config.go | 1 - 1 file changed, 1 deletion(-) diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index b8caa0e09e888..fee2aaf29deb2 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -532,7 +532,6 @@ type TikvImporter struct { EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"` LocalWriterMemCacheSize ByteSize `toml:"local-writer-mem-cache-size" json:"local-writer-mem-cache-size"` - MaxKVWriteBytesPerSec ByteSize `toml:"max-kv-write-bytes-per-sec" json:"max-kv-write-bytes-per-sec"` } type Checkpoint struct { From 42bf379b79afc02648a84b44664e295c56aef58a Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Thu, 16 Jun 2022 16:28:26 +0800 Subject: [PATCH 03/14] address comments --- br/pkg/lightning/backend/local/localhelper.go | 4 +++- br/pkg/pdutil/pd.go | 7 ++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/br/pkg/lightning/backend/local/localhelper.go b/br/pkg/lightning/backend/local/localhelper.go index 57f4aa1f979a3..e7160bcd437de 100644 --- a/br/pkg/lightning/backend/local/localhelper.go +++ b/br/pkg/lightning/backend/local/localhelper.go @@ -479,7 +479,9 @@ func (local *local) checkRegionScatteredOrReScatter(ctx context.Context, regionI if respErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND { return true, nil } - return false, errors.Errorf("get operator error: %s", respErr.GetType()) + return false, errors.Errorf( + "failed to get region operator, error type: %s, error message: %s", + respErr.GetType().String(), respErr.GetMessage()) } // If the current operator of the region is not 'scatter-region', we could assume // that 'scatter-operator' has finished. diff --git a/br/pkg/pdutil/pd.go b/br/pkg/pdutil/pd.go index 933141944b998..c63ac0c4dcdee 100644 --- a/br/pkg/pdutil/pd.go +++ b/br/pkg/pdutil/pd.go @@ -753,9 +753,10 @@ func (p *PdController) CreateOrUpdateRegionLabelRule(ctx context.Context, rule L if lastErr == nil { return nil } - if ctx.Err() != nil { - return errors.Cause(ctx.Err()) + if err := errors.Cause(err); err == context.Canceled || err == context.DeadlineExceeded { + return errors.Trace(err) } + if i < len(p.addrs) { log.Warn("failed to create or update region label rule, will try next pd address", zap.Error(lastErr), zap.String("pdAddr", addr)) @@ -802,7 +803,7 @@ func (p *PdController) pauseSchedulerByKeyRangeWithTTL(ctx context.Context, star select { case <-ticker.C: if err := p.CreateOrUpdateRegionLabelRule(ctx, rule); err != nil { - if ctx.Err() != nil { + if err := errors.Cause(err); err == context.Canceled || err == context.DeadlineExceeded { break loop } log.Warn("pause scheduler by key range failed, ignore it and wait next time pause", zap.Error(err)) From 3ef1b43536c4fb7368d5e4dac80ba4d643e7977f Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Mon, 20 Jun 2022 17:51:58 +0800 Subject: [PATCH 04/14] add integration test --- br/pkg/lightning/backend/local/local.go | 3 + br/pkg/lightning/lightning.go | 8 ++ br/pkg/pdutil/pd.go | 26 +++++- .../config.toml | 0 .../data/test-schema-create.sql | 1 + .../data/test.t-schema.sql | 1 + .../data/test.t.sql | 1 + .../run.sh | 90 +++++++++++++++++++ 8 files changed, 127 insertions(+), 3 deletions(-) mode change 100644 => 100755 br/pkg/pdutil/pd.go create mode 100644 br/tests/lightning_disable_scheduler_by_key_range/config.toml create mode 100644 br/tests/lightning_disable_scheduler_by_key_range/data/test-schema-create.sql create mode 100644 br/tests/lightning_disable_scheduler_by_key_range/data/test.t-schema.sql create mode 100644 br/tests/lightning_disable_scheduler_by_key_range/data/test.t.sql create mode 100644 br/tests/lightning_disable_scheduler_by_key_range/run.sh diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 1f26ddf69eef2..f4953fdea0fb7 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1386,6 +1386,9 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi log.L().Info("start import engine", zap.Stringer("uuid", engineUUID), zap.Int("ranges", len(ranges)), zap.Int64("count", lfLength), zap.Int64("size", lfTotalSize)) + + failpoint.Inject("ReadyForImportEngine", func() {}) + for { unfinishedRanges := lf.unfinishedRanges(ranges) if len(unfinishedRanges) == 0 { diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index 5cf3df9389d21..7ea832beb8f3b 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -209,6 +209,14 @@ func (l *Lightning) goServe(statusAddr string, realAddrWriter io.Writer) error { mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + // Enable failpoint http API for testing. + failpoint.Inject("EnableTestAPI", func() { + mux.HandleFunc("/fail/", func(w http.ResponseWriter, r *http.Request) { + r.URL.Path = strings.TrimPrefix(r.URL.Path, "/fail") + new(failpoint.HttpHandler).ServeHTTP(w, r) + }) + }) + handleTasks := http.StripPrefix("/tasks", http.HandlerFunc(l.handleTask)) mux.Handle("/tasks", httpHandleWrapper(handleTasks.ServeHTTP)) mux.Handle("/tasks/", httpHandleWrapper(handleTasks.ServeHTTP)) diff --git a/br/pkg/pdutil/pd.go b/br/pkg/pdutil/pd.go old mode 100644 new mode 100755 index c63ac0c4dcdee..dbe00ad8e94ee --- a/br/pkg/pdutil/pd.go +++ b/br/pkg/pdutil/pd.go @@ -753,7 +753,7 @@ func (p *PdController) CreateOrUpdateRegionLabelRule(ctx context.Context, rule L if lastErr == nil { return nil } - if err := errors.Cause(err); err == context.Canceled || err == context.DeadlineExceeded { + if err := errors.Cause(lastErr); err == context.Canceled || err == context.DeadlineExceeded { return errors.Trace(err) } @@ -765,6 +765,26 @@ func (p *PdController) CreateOrUpdateRegionLabelRule(ctx context.Context, rule L return errors.Trace(lastErr) } +func (p *PdController) DeleteRegionLabelRule(ctx context.Context, ruleID string) error { + var lastErr error + for i, addr := range p.addrs { + _, lastErr = pdRequest(ctx, addr, fmt.Sprintf("%s/%s", regionLabelPrefix, ruleID), + p.cli, http.MethodDelete, nil) + if lastErr == nil { + return nil + } + if err := errors.Cause(lastErr); err == context.Canceled || err == context.DeadlineExceeded { + return errors.Trace(err) + } + + if i < len(p.addrs) { + log.Warn("failed to delete region label rule, will try next pd address", + zap.Error(lastErr), zap.String("pdAddr", addr)) + } + } + return errors.Trace(lastErr) +} + // PauseSchedulersByKeyRange will pause schedulers for regions in the specific key range. // This function will spawn a goroutine to keep pausing schedulers periodically until the context is done. // The return done channel is used to notify the caller that the background goroutine is exited. @@ -817,8 +837,8 @@ func (p *PdController) pauseSchedulerByKeyRangeWithTTL(ctx context.Context, star defer cancel() // Set ttl to 0 to remove the rule. rule.Labels[0].TTL = time.Duration(0).String() - if err := p.CreateOrUpdateRegionLabelRule(recoverCtx, rule); err != nil { - log.Warn("failed to remove region label rule, the rule will be removed after ttl expires", + if err := p.DeleteRegionLabelRule(recoverCtx, rule.ID); err != nil { + log.Warn("failed to delete region label rule, the rule will be removed after ttl expires", zap.String("rule-id", rule.ID), zap.Duration("ttl", ttl), zap.Error(err)) } }() diff --git a/br/tests/lightning_disable_scheduler_by_key_range/config.toml b/br/tests/lightning_disable_scheduler_by_key_range/config.toml new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/br/tests/lightning_disable_scheduler_by_key_range/data/test-schema-create.sql b/br/tests/lightning_disable_scheduler_by_key_range/data/test-schema-create.sql new file mode 100644 index 0000000000000..14379bd68472a --- /dev/null +++ b/br/tests/lightning_disable_scheduler_by_key_range/data/test-schema-create.sql @@ -0,0 +1 @@ +CREATE DATABASE test; diff --git a/br/tests/lightning_disable_scheduler_by_key_range/data/test.t-schema.sql b/br/tests/lightning_disable_scheduler_by_key_range/data/test.t-schema.sql new file mode 100644 index 0000000000000..57a1b65732950 --- /dev/null +++ b/br/tests/lightning_disable_scheduler_by_key_range/data/test.t-schema.sql @@ -0,0 +1 @@ +CREATE TABLE t(a INT PRIMARY KEY, b int); diff --git a/br/tests/lightning_disable_scheduler_by_key_range/data/test.t.sql b/br/tests/lightning_disable_scheduler_by_key_range/data/test.t.sql new file mode 100644 index 0000000000000..30e06b42e169b --- /dev/null +++ b/br/tests/lightning_disable_scheduler_by_key_range/data/test.t.sql @@ -0,0 +1 @@ +INSERT INTO t VALUES (1,1); diff --git a/br/tests/lightning_disable_scheduler_by_key_range/run.sh b/br/tests/lightning_disable_scheduler_by_key_range/run.sh new file mode 100644 index 0000000000000..4483dae31b5d9 --- /dev/null +++ b/br/tests/lightning_disable_scheduler_by_key_range/run.sh @@ -0,0 +1,90 @@ +#!/bin/bash +# +# Copyright 2022 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eux + +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/EnableTestAPI=return" +export GO_FAILPOINTS="${GO_FAILPOINTS};github.com/pingcap/tidb/br/pkg/lightning/backend/local/ReadyForImportEngine=pause" + +run_lightning --backend='local' & +shpid="$!" +pid= +port= + +ensure_lightning_is_started() { + for _ in {0..60}; do + pid=$(pstree -p "$shpid" | grep -Eo "tidb-lightning\.\([0-9]*\)" | grep -Eo "[0-9]*") || true + [ -n "$pid" ] && break + sleep 1 + done + if [ -z "$pid" ]; then + echo "lightning doesn't start successfully, please check the log" >&2 + exit 1 + fi + echo "lightning is started, pid is $pid" +} + +start_http_server() { + # Start http server to serve the test API. + kill -SIGUSR1 "$pid" &>/dev/null || true + for _ in {0..60}; do + port=$(grep "starting HTTP server" "$TEST_DIR"/lightning.log | grep -Eo "address=.*" | grep -Eo '[0-9]*') || true + [ -n "$port" ] && break + done + if [ -z "$port" ]; then + echo "http server doesn't start successfully, please check the log" >&2 + exit 1 + fi + echo "http server is started, port is $port" +} + +ready_for_import_engine() { + for _ in {0..60}; do + grep -Fq "start import engine" "$TEST_DIR"/lightning.log && return + sleep 1 + done + echo "lightning doesn't start import engine, please check the log" >&2 + exit 1 +} + +run_curl() { + curl \ + --cacert "$TEST_DIR/certs/ca.pem" \ + --cert "$TEST_DIR/certs/curl.pem" \ + --key "$TEST_DIR/certs/curl.key" \ + "$@" +} + +ensure_lightning_is_started +start_http_server +ready_for_import_engine + +length=$(run_curl "https://${PD_ADDR}/pd/api/v1/config/region-label/rules" | jq 'select(.[].rule_type == "key-range") | length') +if [ "$length" -ne 1 ]; then + echo "region-label key-range rules should be 1, but got $length" >&2 + exit 1 +fi + +run_curl -X DELETE "https://localhost:${port}/fail/github.com/pingcap/tidb/br/pkg/lightning/backend/local/ReadyForImportEngine" +wait "$shpid" + +run_curl "https://${PD_ADDR}/pd/api/v1/config/region-label/rules" + +length=$(run_curl "https://${PD_ADDR}/pd/api/v1/config/region-label/rules" | jq 'select(.[].rule_type == "key-range") | length') +if [ "$length" -ne "0" ]; then + echo "region-label key-range rules should be 0, but got $length" >&2 + exit 1 +fi From 1d6081563bf4e8f3cc313c0556cad5b50de55d1a Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Mon, 20 Jun 2022 18:23:40 +0800 Subject: [PATCH 05/14] use wget --- .../run.sh | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/br/tests/lightning_disable_scheduler_by_key_range/run.sh b/br/tests/lightning_disable_scheduler_by_key_range/run.sh index 4483dae31b5d9..34b700e31f1a2 100644 --- a/br/tests/lightning_disable_scheduler_by_key_range/run.sh +++ b/br/tests/lightning_disable_scheduler_by_key_range/run.sh @@ -60,11 +60,12 @@ ready_for_import_engine() { exit 1 } -run_curl() { - curl \ - --cacert "$TEST_DIR/certs/ca.pem" \ - --cert "$TEST_DIR/certs/curl.pem" \ - --key "$TEST_DIR/certs/curl.key" \ +# FIXME: use `wget` instead of `curl` because the latter rejects ECC certs on our CI. +run_wget() { + wget -q -O - \ + --ca-certificate="$TEST_DIR/certs/ca.pem" \ + --certificate="$TEST_DIR/certs/curl.pem" \ + --private-key="$TEST_DIR/certs/curl.key" \ "$@" } @@ -72,19 +73,17 @@ ensure_lightning_is_started start_http_server ready_for_import_engine -length=$(run_curl "https://${PD_ADDR}/pd/api/v1/config/region-label/rules" | jq 'select(.[].rule_type == "key-range") | length') +length=$(run_wget "https://${PD_ADDR}/pd/api/v1/config/region-label/rules" | jq 'select(.[].rule_type == "key-range") | length') if [ "$length" -ne 1 ]; then echo "region-label key-range rules should be 1, but got $length" >&2 exit 1 fi -run_curl -X DELETE "https://localhost:${port}/fail/github.com/pingcap/tidb/br/pkg/lightning/backend/local/ReadyForImportEngine" +run_wget --method=DELETE "https://localhost:${port}/fail/github.com/pingcap/tidb/br/pkg/lightning/backend/local/ReadyForImportEngine" wait "$shpid" -run_curl "https://${PD_ADDR}/pd/api/v1/config/region-label/rules" - -length=$(run_curl "https://${PD_ADDR}/pd/api/v1/config/region-label/rules" | jq 'select(.[].rule_type == "key-range") | length') -if [ "$length" -ne "0" ]; then +length=$(run_wget "https://${PD_ADDR}/pd/api/v1/config/region-label/rules" | jq 'select(.[].rule_type == "key-range") | length') +if [ -n "$length" ] && [ "$length" -ne 0 ]; then echo "region-label key-range rules should be 0, but got $length" >&2 exit 1 fi From 6f57ff5931d545c23ab3335033df2e0f10746447 Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Tue, 21 Jun 2022 10:20:44 +0800 Subject: [PATCH 06/14] test ci --- br/tests/lightning_disable_scheduler_by_key_range/run.sh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/br/tests/lightning_disable_scheduler_by_key_range/run.sh b/br/tests/lightning_disable_scheduler_by_key_range/run.sh index 34b700e31f1a2..57ec8794cb04a 100644 --- a/br/tests/lightning_disable_scheduler_by_key_range/run.sh +++ b/br/tests/lightning_disable_scheduler_by_key_range/run.sh @@ -74,11 +74,12 @@ start_http_server ready_for_import_engine length=$(run_wget "https://${PD_ADDR}/pd/api/v1/config/region-label/rules" | jq 'select(.[].rule_type == "key-range") | length') -if [ "$length" -ne 1 ]; then +if [ "$length" != "1" ]; then echo "region-label key-range rules should be 1, but got $length" >&2 exit 1 fi +wget --help run_wget --method=DELETE "https://localhost:${port}/fail/github.com/pingcap/tidb/br/pkg/lightning/backend/local/ReadyForImportEngine" wait "$shpid" From 87aac6a09211760dc6d1d413e8de7beacac50af7 Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Tue, 21 Jun 2022 10:46:31 +0800 Subject: [PATCH 07/14] debug ci --- br/tests/lightning_disable_scheduler_by_key_range/run.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/br/tests/lightning_disable_scheduler_by_key_range/run.sh b/br/tests/lightning_disable_scheduler_by_key_range/run.sh index 57ec8794cb04a..d6738353cde46 100644 --- a/br/tests/lightning_disable_scheduler_by_key_range/run.sh +++ b/br/tests/lightning_disable_scheduler_by_key_range/run.sh @@ -73,6 +73,8 @@ ensure_lightning_is_started start_http_server ready_for_import_engine +run_wget "https://${PD_ADDR}/pd/api/v1/config/cluster-version" + length=$(run_wget "https://${PD_ADDR}/pd/api/v1/config/region-label/rules" | jq 'select(.[].rule_type == "key-range") | length') if [ "$length" != "1" ]; then echo "region-label key-range rules should be 1, but got $length" >&2 From 7e9eb2aa8879f6f2b75e16b3470eb4328aff7923 Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Tue, 21 Jun 2022 18:01:20 +0800 Subject: [PATCH 08/14] use static status port --- .../config.toml | 2 ++ .../run.sh | 18 +----------------- 2 files changed, 3 insertions(+), 17 deletions(-) diff --git a/br/tests/lightning_disable_scheduler_by_key_range/config.toml b/br/tests/lightning_disable_scheduler_by_key_range/config.toml index e69de29bb2d1d..dce628f6a61c4 100644 --- a/br/tests/lightning_disable_scheduler_by_key_range/config.toml +++ b/br/tests/lightning_disable_scheduler_by_key_range/config.toml @@ -0,0 +1,2 @@ +[lightning] +status-addr = ":8289" diff --git a/br/tests/lightning_disable_scheduler_by_key_range/run.sh b/br/tests/lightning_disable_scheduler_by_key_range/run.sh index d6738353cde46..a2668e6e816de 100644 --- a/br/tests/lightning_disable_scheduler_by_key_range/run.sh +++ b/br/tests/lightning_disable_scheduler_by_key_range/run.sh @@ -22,7 +22,6 @@ export GO_FAILPOINTS="${GO_FAILPOINTS};github.com/pingcap/tidb/br/pkg/lightning/ run_lightning --backend='local' & shpid="$!" pid= -port= ensure_lightning_is_started() { for _ in {0..60}; do @@ -37,20 +36,6 @@ ensure_lightning_is_started() { echo "lightning is started, pid is $pid" } -start_http_server() { - # Start http server to serve the test API. - kill -SIGUSR1 "$pid" &>/dev/null || true - for _ in {0..60}; do - port=$(grep "starting HTTP server" "$TEST_DIR"/lightning.log | grep -Eo "address=.*" | grep -Eo '[0-9]*') || true - [ -n "$port" ] && break - done - if [ -z "$port" ]; then - echo "http server doesn't start successfully, please check the log" >&2 - exit 1 - fi - echo "http server is started, port is $port" -} - ready_for_import_engine() { for _ in {0..60}; do grep -Fq "start import engine" "$TEST_DIR"/lightning.log && return @@ -70,7 +55,6 @@ run_wget() { } ensure_lightning_is_started -start_http_server ready_for_import_engine run_wget "https://${PD_ADDR}/pd/api/v1/config/cluster-version" @@ -82,7 +66,7 @@ if [ "$length" != "1" ]; then fi wget --help -run_wget --method=DELETE "https://localhost:${port}/fail/github.com/pingcap/tidb/br/pkg/lightning/backend/local/ReadyForImportEngine" +run_wget --method=DELETE "https://localhost:8289/fail/github.com/pingcap/tidb/br/pkg/lightning/backend/local/ReadyForImportEngine" wait "$shpid" length=$(run_wget "https://${PD_ADDR}/pd/api/v1/config/region-label/rules" | jq 'select(.[].rule_type == "key-range") | length') From 4d4e9517a18169d238e8f6824dd678e745f06a7f Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Wed, 22 Jun 2022 13:43:02 +0800 Subject: [PATCH 09/14] fix ci --- .../run.sh | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/br/tests/lightning_disable_scheduler_by_key_range/run.sh b/br/tests/lightning_disable_scheduler_by_key_range/run.sh index a2668e6e816de..9df6067c8baf8 100644 --- a/br/tests/lightning_disable_scheduler_by_key_range/run.sh +++ b/br/tests/lightning_disable_scheduler_by_key_range/run.sh @@ -17,7 +17,7 @@ set -eux export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/EnableTestAPI=return" -export GO_FAILPOINTS="${GO_FAILPOINTS};github.com/pingcap/tidb/br/pkg/lightning/backend/local/ReadyForImportEngine=pause" +export GO_FAILPOINTS="${GO_FAILPOINTS};github.com/pingcap/tidb/br/pkg/lightning/backend/local/ReadyForImportEngine=sleep(10000)" run_lightning --backend='local' & shpid="$!" @@ -45,31 +45,20 @@ ready_for_import_engine() { exit 1 } -# FIXME: use `wget` instead of `curl` because the latter rejects ECC certs on our CI. -run_wget() { - wget -q -O - \ - --ca-certificate="$TEST_DIR/certs/ca.pem" \ - --certificate="$TEST_DIR/certs/curl.pem" \ - --private-key="$TEST_DIR/certs/curl.key" \ - "$@" -} - ensure_lightning_is_started ready_for_import_engine -run_wget "https://${PD_ADDR}/pd/api/v1/config/cluster-version" +run_curl "https://${PD_ADDR}/pd/api/v1/config/cluster-version" -length=$(run_wget "https://${PD_ADDR}/pd/api/v1/config/region-label/rules" | jq 'select(.[].rule_type == "key-range") | length') +length=$(run_curl "https://${PD_ADDR}/pd/api/v1/config/region-label/rules" | jq 'select(.[].rule_type == "key-range") | length') if [ "$length" != "1" ]; then echo "region-label key-range rules should be 1, but got $length" >&2 exit 1 fi -wget --help -run_wget --method=DELETE "https://localhost:8289/fail/github.com/pingcap/tidb/br/pkg/lightning/backend/local/ReadyForImportEngine" wait "$shpid" -length=$(run_wget "https://${PD_ADDR}/pd/api/v1/config/region-label/rules" | jq 'select(.[].rule_type == "key-range") | length') +length=$(run_curl "https://${PD_ADDR}/pd/api/v1/config/region-label/rules" | jq 'select(.[].rule_type == "key-range") | length') if [ -n "$length" ] && [ "$length" -ne 0 ]; then echo "region-label key-range rules should be 0, but got $length" >&2 exit 1 From fcbda40a301175eed741828f1ff3ba806bf85dda Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Wed, 22 Jun 2022 15:29:07 +0800 Subject: [PATCH 10/14] fix restore --- br/pkg/lightning/restore/restore.go | 50 ++++++++++++++++------------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 2563ea5c39022..6246c27ef411b 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -1380,35 +1380,41 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) { switchBack := false cleanup := false postProgress := func() error { return nil } - if rc.cfg.TikvImporter.Backend == config.BackendLocal && !rc.taskMgr.CanPauseSchedulerByKeyRange() { - logTask.Info("removing PD leader®ion schedulers") + if rc.cfg.TikvImporter.Backend == config.BackendLocal { + var restoreFn pdutil.UndoFunc - restoreFn, err := rc.taskMgr.CheckAndPausePdSchedulers(ctx) - if err != nil { - return errors.Trace(err) + if !rc.taskMgr.CanPauseSchedulerByKeyRange() { + logTask.Info("removing PD leader®ion schedulers") + + var err error + restoreFn, err = rc.taskMgr.CheckAndPausePdSchedulers(ctx) + if err != nil { + return errors.Trace(err) + } } finishSchedulers = func() { - if restoreFn != nil { - taskFinished := finalErr == nil - // use context.Background to make sure this restore function can still be executed even if ctx is canceled - restoreCtx := context.Background() - needSwitchBack, needCleanup, err := rc.taskMgr.CheckAndFinishRestore(restoreCtx, taskFinished) - if err != nil { - logTask.Warn("check restore pd schedulers failed", zap.Error(err)) - return - } - switchBack = needSwitchBack - if needSwitchBack { - logTask.Info("add back PD leader®ion schedulers") - if restoreE := restoreFn(restoreCtx); restoreE != nil { - logTask.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) - } + taskFinished := finalErr == nil + // use context.Background to make sure this restore function can still be executed even if ctx is canceled + restoreCtx := context.Background() + needSwitchBack, needCleanup, err := rc.taskMgr.CheckAndFinishRestore(restoreCtx, taskFinished) + if err != nil { + logTask.Warn("check restore pd schedulers failed", zap.Error(err)) + return + } + switchBack = needSwitchBack + cleanup = needCleanup + + if needSwitchBack && restoreFn != nil { + logTask.Info("add back PD leader®ion schedulers") + if restoreE := restoreFn(restoreCtx); restoreE != nil { + logTask.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) } - cleanup = needCleanup } - rc.taskMgr.Close() + if rc.taskMgr != nil { + rc.taskMgr.Close() + } } } From f340e890ae03127bc95c5c794e64d438d016866e Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Wed, 22 Jun 2022 15:50:14 +0800 Subject: [PATCH 11/14] fix unstable test --- br/tests/lightning_incremental/run.sh | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/br/tests/lightning_incremental/run.sh b/br/tests/lightning_incremental/run.sh index 4cdd5a53ec74b..97b02b962afbe 100644 --- a/br/tests/lightning_incremental/run.sh +++ b/br/tests/lightning_incremental/run.sh @@ -25,8 +25,6 @@ run_lightning_and_check_meta() { check_not_contains "Database: lightning_metadata" } -DB_NAME=incr - run_sql "DROP DATABASE IF EXISTS incr;" run_sql "DROP DATABASE IF EXISTS lightning_metadata;" run_lightning_and_check_meta @@ -48,7 +46,7 @@ for tbl in auto_random pk_auto_inc rowid_uk_inc uk_auto_inc; do done for tbl in pk_auto_inc rowid_uk_inc; do - run_sql "SELECT group_concat(v) from incr.$tbl group by 'all';" + run_sql "SELECT group_concat(v) from incr.$tbl order by v;" check_contains "group_concat(v): a,b,c" done @@ -75,7 +73,7 @@ for tbl in auto_random pk_auto_inc rowid_uk_inc uk_auto_inc; do done for tbl in pk_auto_inc rowid_uk_inc; do - run_sql "SELECT group_concat(v) from incr.$tbl group by 'all';" + run_sql "SELECT group_concat(v) from incr.$tbl order by v;" check_contains "group_concat(v): a,b,c,d,e,f" done From 9a84a312244eeb4ebe324f83d4c2d722b411d902 Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Wed, 22 Jun 2022 16:45:17 +0800 Subject: [PATCH 12/14] fix test --- br/pkg/errors/errors.go | 14 ++++++++++++++ br/pkg/errors/errors_test.go | 24 ++++++++++++++++++++++++ br/pkg/pdutil/pd.go | 8 ++++---- br/pkg/pdutil/pd_serial_test.go | 7 +++++++ br/tests/lightning_incremental/run.sh | 8 ++++---- 5 files changed, 53 insertions(+), 8 deletions(-) create mode 100644 br/pkg/errors/errors_test.go diff --git a/br/pkg/errors/errors.go b/br/pkg/errors/errors.go index 67f8a5c15d4ba..7585f7bc43673 100644 --- a/br/pkg/errors/errors.go +++ b/br/pkg/errors/errors.go @@ -3,6 +3,9 @@ package errors import ( + "context" + stderrors "errors" + "github.com/pingcap/errors" ) @@ -15,6 +18,17 @@ func Is(err error, is *errors.Error) bool { return errorFound != nil } +// IsContextCanceled checks whether the is caused by context.Canceled. +// errors.Cause does not work for the error wrapped by %w in fmt.Errorf. +// So we need to call stderrors.Is to unwrap the error. +func IsContextCanceled(err error) bool { + err = errors.Cause(err) + if err == context.Canceled || err == context.DeadlineExceeded { + return true + } + return stderrors.Is(err, context.Canceled) || stderrors.Is(err, context.DeadlineExceeded) +} + // BR errors. var ( ErrUnknown = errors.Normalize("internal error", errors.RFCCodeText("BR:Common:ErrUnknown")) diff --git a/br/pkg/errors/errors_test.go b/br/pkg/errors/errors_test.go new file mode 100644 index 0000000000000..a6f4c412280cc --- /dev/null +++ b/br/pkg/errors/errors_test.go @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0. + +package errors_test + +import ( + "context" + "net/url" + "testing" + + "github.com/pingcap/errors" + berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/stretchr/testify/require" +) + +func TestIsContextCanceled(t *testing.T) { + require.False(t, berrors.IsContextCanceled(nil)) + require.False(t, berrors.IsContextCanceled(errors.New("connection closed"))) + require.True(t, berrors.IsContextCanceled(context.Canceled)) + require.True(t, berrors.IsContextCanceled(context.DeadlineExceeded)) + require.True(t, berrors.IsContextCanceled(errors.Trace(context.Canceled))) + require.True(t, berrors.IsContextCanceled(errors.Trace(context.DeadlineExceeded))) + require.True(t, berrors.IsContextCanceled(&url.Error{Err: context.Canceled})) + require.True(t, berrors.IsContextCanceled(&url.Error{Err: context.DeadlineExceeded})) +} diff --git a/br/pkg/pdutil/pd.go b/br/pkg/pdutil/pd.go index dbe00ad8e94ee..0e6e5c7094096 100755 --- a/br/pkg/pdutil/pd.go +++ b/br/pkg/pdutil/pd.go @@ -753,8 +753,8 @@ func (p *PdController) CreateOrUpdateRegionLabelRule(ctx context.Context, rule L if lastErr == nil { return nil } - if err := errors.Cause(lastErr); err == context.Canceled || err == context.DeadlineExceeded { - return errors.Trace(err) + if berrors.IsContextCanceled(lastErr) { + return errors.Trace(lastErr) } if i < len(p.addrs) { @@ -773,8 +773,8 @@ func (p *PdController) DeleteRegionLabelRule(ctx context.Context, ruleID string) if lastErr == nil { return nil } - if err := errors.Cause(lastErr); err == context.Canceled || err == context.DeadlineExceeded { - return errors.Trace(err) + if berrors.IsContextCanceled(lastErr) { + return errors.Trace(lastErr) } if i < len(p.addrs) { diff --git a/br/pkg/pdutil/pd_serial_test.go b/br/pkg/pdutil/pd_serial_test.go index 73eb4048b278e..608830fe190fe 100644 --- a/br/pkg/pdutil/pd_serial_test.go +++ b/br/pkg/pdutil/pd_serial_test.go @@ -12,6 +12,7 @@ import ( "net/http" "net/http/httptest" "net/url" + "strings" "testing" "time" @@ -239,6 +240,12 @@ func TestPauseSchedulersByKeyRange(t *testing.T) { labelExpires := make(map[string]time.Time) httpSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodDelete { + ruleID := strings.TrimPrefix(r.URL.Path, "/"+regionLabelPrefix+"/") + print(ruleID) + delete(labelExpires, ruleID) + return + } var labelRule LabelRule err := json.NewDecoder(r.Body).Decode(&labelRule) require.NoError(t, err) diff --git a/br/tests/lightning_incremental/run.sh b/br/tests/lightning_incremental/run.sh index 97b02b962afbe..a025e7bebc6f1 100644 --- a/br/tests/lightning_incremental/run.sh +++ b/br/tests/lightning_incremental/run.sh @@ -46,8 +46,8 @@ for tbl in auto_random pk_auto_inc rowid_uk_inc uk_auto_inc; do done for tbl in pk_auto_inc rowid_uk_inc; do - run_sql "SELECT group_concat(v) from incr.$tbl order by v;" - check_contains "group_concat(v): a,b,c" + run_sql "SELECT group_concat(v order by v) as result from incr.$tbl group by 'all';" + check_contains "result: a,b,c" done run_sql "SELECT sum(pk) from incr.uk_auto_inc;" @@ -73,8 +73,8 @@ for tbl in auto_random pk_auto_inc rowid_uk_inc uk_auto_inc; do done for tbl in pk_auto_inc rowid_uk_inc; do - run_sql "SELECT group_concat(v) from incr.$tbl order by v;" - check_contains "group_concat(v): a,b,c,d,e,f" + run_sql "SELECT group_concat(v order by v) as result from incr.$tbl group by 'all';" + check_contains "result: a,b,c,d,e,f" done run_sql "SELECT sum(pk) from incr.uk_auto_inc;" From ef65a9f5b2a5b1dd61e3b896ce70d19c79c8d326 Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Fri, 24 Jun 2022 12:00:24 +0800 Subject: [PATCH 13/14] Update br/pkg/pdutil/pd.go Co-authored-by: Obliviate <756541536@qq.com> --- br/pkg/pdutil/pd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/pdutil/pd.go b/br/pkg/pdutil/pd.go index 0e6e5c7094096..ba2c2706ba9aa 100755 --- a/br/pkg/pdutil/pd.go +++ b/br/pkg/pdutil/pd.go @@ -823,7 +823,7 @@ func (p *PdController) pauseSchedulerByKeyRangeWithTTL(ctx context.Context, star select { case <-ticker.C: if err := p.CreateOrUpdateRegionLabelRule(ctx, rule); err != nil { - if err := errors.Cause(err); err == context.Canceled || err == context.DeadlineExceeded { + if err := errors.Cause(err); IsContextCanceled(err) { break loop } log.Warn("pause scheduler by key range failed, ignore it and wait next time pause", zap.Error(err)) From c27bcf84c5ce9e60a330b65ae808ce8f353f377f Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Fri, 24 Jun 2022 12:12:31 +0800 Subject: [PATCH 14/14] fix build --- br/pkg/pdutil/pd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/pdutil/pd.go b/br/pkg/pdutil/pd.go index ba2c2706ba9aa..1c2d8dd6754bc 100755 --- a/br/pkg/pdutil/pd.go +++ b/br/pkg/pdutil/pd.go @@ -823,7 +823,7 @@ func (p *PdController) pauseSchedulerByKeyRangeWithTTL(ctx context.Context, star select { case <-ticker.C: if err := p.CreateOrUpdateRegionLabelRule(ctx, rule); err != nil { - if err := errors.Cause(err); IsContextCanceled(err) { + if berrors.IsContextCanceled(err) { break loop } log.Warn("pause scheduler by key range failed, ignore it and wait next time pause", zap.Error(err))