diff --git a/br/pkg/errors/errors.go b/br/pkg/errors/errors.go index 67f8a5c15d4ba..7585f7bc43673 100644 --- a/br/pkg/errors/errors.go +++ b/br/pkg/errors/errors.go @@ -3,6 +3,9 @@ package errors import ( + "context" + stderrors "errors" + "github.com/pingcap/errors" ) @@ -15,6 +18,17 @@ func Is(err error, is *errors.Error) bool { return errorFound != nil } +// IsContextCanceled checks whether the is caused by context.Canceled. +// errors.Cause does not work for the error wrapped by %w in fmt.Errorf. +// So we need to call stderrors.Is to unwrap the error. +func IsContextCanceled(err error) bool { + err = errors.Cause(err) + if err == context.Canceled || err == context.DeadlineExceeded { + return true + } + return stderrors.Is(err, context.Canceled) || stderrors.Is(err, context.DeadlineExceeded) +} + // BR errors. var ( ErrUnknown = errors.Normalize("internal error", errors.RFCCodeText("BR:Common:ErrUnknown")) diff --git a/br/pkg/errors/errors_test.go b/br/pkg/errors/errors_test.go new file mode 100644 index 0000000000000..a6f4c412280cc --- /dev/null +++ b/br/pkg/errors/errors_test.go @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0. + +package errors_test + +import ( + "context" + "net/url" + "testing" + + "github.com/pingcap/errors" + berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/stretchr/testify/require" +) + +func TestIsContextCanceled(t *testing.T) { + require.False(t, berrors.IsContextCanceled(nil)) + require.False(t, berrors.IsContextCanceled(errors.New("connection closed"))) + require.True(t, berrors.IsContextCanceled(context.Canceled)) + require.True(t, berrors.IsContextCanceled(context.DeadlineExceeded)) + require.True(t, berrors.IsContextCanceled(errors.Trace(context.Canceled))) + require.True(t, berrors.IsContextCanceled(errors.Trace(context.DeadlineExceeded))) + require.True(t, berrors.IsContextCanceled(&url.Error{Err: context.Canceled})) + require.True(t, berrors.IsContextCanceled(&url.Error{Err: context.DeadlineExceeded})) +} diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 76bdba192bd99..8a45472dddfcc 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1392,8 +1392,32 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi return err } + if len(ranges) > 0 && local.pdCtl.CanPauseSchedulerByKeyRange() { + subCtx, cancel := context.WithCancel(ctx) + defer cancel() + + var startKey, endKey []byte + if len(ranges[0].start) > 0 { + startKey = codec.EncodeBytes(nil, ranges[0].start) + } + if len(ranges[len(ranges)-1].end) > 0 { + endKey = codec.EncodeBytes(nil, ranges[len(ranges)-1].end) + } + done, err := local.pdCtl.PauseSchedulersByKeyRange(subCtx, startKey, endKey) + if err != nil { + return errors.Trace(err) + } + defer func() { + cancel() + <-done + }() + } + log.FromContext(ctx).Info("start import engine", zap.Stringer("uuid", engineUUID), zap.Int("ranges", len(ranges)), zap.Int64("count", lfLength), zap.Int64("size", lfTotalSize)) + + failpoint.Inject("ReadyForImportEngine", func() {}) + for { unfinishedRanges := lf.unfinishedRanges(ranges) if len(unfinishedRanges) == 0 { diff --git a/br/pkg/lightning/backend/local/localhelper.go b/br/pkg/lightning/backend/local/localhelper.go index 9839e3592d195..1672b5f212436 100644 --- a/br/pkg/lightning/backend/local/localhelper.go +++ b/br/pkg/lightning/backend/local/localhelper.go @@ -19,7 +19,6 @@ import ( "context" "database/sql" "math" - "regexp" "runtime" "sort" "strings" @@ -332,14 +331,7 @@ func (local *local) SplitAndScatterRegionByRanges( } startTime := time.Now() - scatterCount := 0 - for _, region := range scatterRegions { - local.waitForScatterRegion(ctx, region) - if time.Since(startTime) > split.ScatterWaitUpperInterval { - break - } - scatterCount++ - } + scatterCount, err := local.waitForScatterRegions(ctx, scatterRegions) if scatterCount == len(scatterRegions) { log.FromContext(ctx).Info("waiting for scattering regions done", zap.Int("skipped_keys", skippedKeys), @@ -349,7 +341,8 @@ func (local *local) SplitAndScatterRegionByRanges( zap.Int("skipped_keys", skippedKeys), zap.Int("scatterCount", scatterCount), zap.Int("regions", len(scatterRegions)), - zap.Duration("take", time.Since(startTime))) + zap.Duration("take", time.Since(startTime)), + zap.Error(err)) } return nil } @@ -447,28 +440,38 @@ func (local *local) waitForSplit(ctx context.Context, regionID uint64) { } } -func (local *local) waitForScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) { - for i := 0; i < split.ScatterWaitMaxRetryTimes; i++ { - ok, err := local.checkScatterRegionFinishedOrReScatter(ctx, regionInfo) - if ok { - return - } - if err != nil { - if !common.IsRetryableError(err) { - log.FromContext(ctx).Warn("wait for scatter region encountered non-retryable error", logutil.Region(regionInfo.Region), zap.Error(err)) - return +func (local *local) waitForScatterRegions(ctx context.Context, regions []*split.RegionInfo) (scatterCount int, _ error) { + subCtx, cancel := context.WithTimeout(ctx, split.ScatterWaitUpperInterval) + defer cancel() + + for len(regions) > 0 { + var retryRegions []*split.RegionInfo + for _, region := range regions { + scattered, err := local.checkRegionScatteredOrReScatter(subCtx, region) + if scattered { + scatterCount++ + continue } - log.FromContext(ctx).Warn("wait for scatter region encountered error, will retry again", logutil.Region(regionInfo.Region), zap.Error(err)) + if err != nil { + if !common.IsRetryableError(err) { + log.FromContext(ctx).Warn("wait for scatter region encountered non-retryable error", logutil.Region(region.Region), zap.Error(err)) + return scatterCount, err + } + log.FromContext(ctx).Warn("wait for scatter region encountered error, will retry again", logutil.Region(region.Region), zap.Error(err)) + } + retryRegions = append(retryRegions, region) } + regions = retryRegions select { case <-time.After(time.Second): - case <-ctx.Done(): + case <-subCtx.Done(): return } } + return scatterCount, nil } -func (local *local) checkScatterRegionFinishedOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) { +func (local *local) checkRegionScatteredOrReScatter(ctx context.Context, regionInfo *split.RegionInfo) (bool, error) { resp, err := local.splitCli.GetOperator(ctx, regionInfo.Region.GetId()) if err != nil { return false, err @@ -478,13 +481,9 @@ func (local *local) checkScatterRegionFinishedOrReScatter(ctx context.Context, r if respErr.GetType() == pdpb.ErrorType_REGION_NOT_FOUND { return true, nil } - // don't return error if region replicate not complete - // TODO: should add a new error type to avoid this check by string matching - matches, _ := regexp.MatchString("region \\d+ is not fully replicated", respErr.Message) - if matches { - return false, nil - } - return false, errors.Errorf("get operator error: %s", respErr.GetType()) + return false, errors.Errorf( + "failed to get region operator, error type: %s, error message: %s", + respErr.GetType().String(), respErr.GetMessage()) } // If the current operator of the region is not 'scatter-region', we could assume // that 'scatter-operator' has finished. diff --git a/br/pkg/lightning/common/retry.go b/br/pkg/lightning/common/retry.go index accf7423414b4..3a7757d650738 100644 --- a/br/pkg/lightning/common/retry.go +++ b/br/pkg/lightning/common/retry.go @@ -20,6 +20,7 @@ import ( "io" "net" "os" + "regexp" "syscall" "github.com/go-sql-driver/mysql" @@ -30,6 +31,8 @@ import ( "google.golang.org/grpc/status" ) +var regionNotFullyReplicatedRe = regexp.MustCompile(`region \d+ is not fully replicated`) + // IsRetryableError returns whether the error is transient (e.g. network // connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This // function returns `false` (irrecoverable) if `err == nil`. @@ -88,6 +91,9 @@ func isSingleRetryableError(err error) bool { } return false default: + if regionNotFullyReplicatedRe.MatchString(err.Error()) { + return true + } switch status.Code(err) { case codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss: return true diff --git a/br/pkg/lightning/common/retry_test.go b/br/pkg/lightning/common/retry_test.go index 670004260f5a1..b707a8d1d1c5f 100644 --- a/br/pkg/lightning/common/retry_test.go +++ b/br/pkg/lightning/common/retry_test.go @@ -94,4 +94,6 @@ func TestIsRetryableError(t *testing.T) { require.False(t, IsRetryableError(multierr.Combine(context.Canceled, context.Canceled))) require.True(t, IsRetryableError(multierr.Combine(&net.DNSError{IsTimeout: true}, &net.DNSError{IsTimeout: true}))) require.False(t, IsRetryableError(multierr.Combine(context.Canceled, &net.DNSError{IsTimeout: true}))) + + require.True(t, IsRetryableError(errors.Errorf("region %d is not fully replicated", 1234))) } diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index 111b7c93b59b4..6e0f63f4df463 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -209,6 +209,14 @@ func (l *Lightning) goServe(statusAddr string, realAddrWriter io.Writer) error { mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + // Enable failpoint http API for testing. + failpoint.Inject("EnableTestAPI", func() { + mux.HandleFunc("/fail/", func(w http.ResponseWriter, r *http.Request) { + r.URL.Path = strings.TrimPrefix(r.URL.Path, "/fail") + new(failpoint.HttpHandler).ServeHTTP(w, r) + }) + }) + handleTasks := http.StripPrefix("/tasks", http.HandlerFunc(l.handleTask)) mux.Handle("/tasks", httpHandleWrapper(handleTasks.ServeHTTP)) mux.Handle("/tasks/", httpHandleWrapper(handleTasks.ServeHTTP)) diff --git a/br/pkg/lightning/restore/meta_manager.go b/br/pkg/lightning/restore/meta_manager.go index 0af04e69feedb..b94bde8208be6 100644 --- a/br/pkg/lightning/restore/meta_manager.go +++ b/br/pkg/lightning/restore/meta_manager.go @@ -555,6 +555,8 @@ type taskMetaMgr interface { // need to update or any new tasks. There is at most one lightning who can execute the action function at the same time. // Note that action may be executed multiple times due to transaction retry, caller should make sure it's idempotent. CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error + // CanPauseSchedulerByKeyRange returns whether the scheduler can pause by the key range. + CanPauseSchedulerByKeyRange() bool CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error) // CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata // Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal) @@ -867,6 +869,10 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U }, nil } +func (m *dbTaskMetaMgr) CanPauseSchedulerByKeyRange() bool { + return m.pd.CanPauseSchedulerByKeyRange() +} + // CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata // Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal) // the second boolean indicates whether to clean up the metadata in tidb @@ -1058,6 +1064,10 @@ func (m noopTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil. }, nil } +func (m noopTaskMetaMgr) CanPauseSchedulerByKeyRange() bool { + return false +} + func (m noopTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) { return true, nil } @@ -1168,6 +1178,10 @@ func (m *singleTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdut return m.pd.RemoveSchedulers(ctx) } +func (m *singleTaskMetaMgr) CanPauseSchedulerByKeyRange() bool { + return m.pd.CanPauseSchedulerByKeyRange() +} + func (m *singleTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) { return m.initialized, nil } diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 31a48c620846a..6246c27ef411b 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -1370,42 +1370,51 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) { // make split region and ingest sst more stable // because importer backend is mostly use for v3.x cluster which doesn't support these api, // so we also don't do this for import backend - finishSchedulers := func() {} + finishSchedulers := func() { + if rc.taskMgr != nil { + rc.taskMgr.Close() + } + } // if one lightning failed abnormally, and can't determine whether it needs to switch back, // we do not do switch back automatically switchBack := false cleanup := false postProgress := func() error { return nil } if rc.cfg.TikvImporter.Backend == config.BackendLocal { + var restoreFn pdutil.UndoFunc - logTask.Info("removing PD leader®ion schedulers") + if !rc.taskMgr.CanPauseSchedulerByKeyRange() { + logTask.Info("removing PD leader®ion schedulers") - restoreFn, err := rc.taskMgr.CheckAndPausePdSchedulers(ctx) - if err != nil { - return errors.Trace(err) + var err error + restoreFn, err = rc.taskMgr.CheckAndPausePdSchedulers(ctx) + if err != nil { + return errors.Trace(err) + } } finishSchedulers = func() { - if restoreFn != nil { - taskFinished := finalErr == nil - // use context.Background to make sure this restore function can still be executed even if ctx is canceled - restoreCtx := context.Background() - needSwitchBack, needCleanup, err := rc.taskMgr.CheckAndFinishRestore(restoreCtx, taskFinished) - if err != nil { - logTask.Warn("check restore pd schedulers failed", zap.Error(err)) - return - } - switchBack = needSwitchBack - if needSwitchBack { - logTask.Info("add back PD leader®ion schedulers") - if restoreE := restoreFn(restoreCtx); restoreE != nil { - logTask.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) - } + taskFinished := finalErr == nil + // use context.Background to make sure this restore function can still be executed even if ctx is canceled + restoreCtx := context.Background() + needSwitchBack, needCleanup, err := rc.taskMgr.CheckAndFinishRestore(restoreCtx, taskFinished) + if err != nil { + logTask.Warn("check restore pd schedulers failed", zap.Error(err)) + return + } + switchBack = needSwitchBack + cleanup = needCleanup + + if needSwitchBack && restoreFn != nil { + logTask.Info("add back PD leader®ion schedulers") + if restoreE := restoreFn(restoreCtx); restoreE != nil { + logTask.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE)) } - cleanup = needCleanup } - rc.taskMgr.Close() + if rc.taskMgr != nil { + rc.taskMgr.Close() + } } } diff --git a/br/pkg/pdutil/pd.go b/br/pkg/pdutil/pd.go old mode 100644 new mode 100755 index 599eefe30afb5..1c2d8dd6754bc --- a/br/pkg/pdutil/pd.go +++ b/br/pkg/pdutil/pd.go @@ -6,6 +6,7 @@ import ( "bytes" "context" "crypto/tls" + "encoding/hex" "encoding/json" "fmt" "io" @@ -17,6 +18,7 @@ import ( "github.com/coreos/go-semver/semver" "github.com/docker/go-units" + "github.com/google/uuid" "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -36,6 +38,7 @@ const ( regionCountPrefix = "pd/api/v1/stats/region" storePrefix = "pd/api/v1/store" schedulerPrefix = "pd/api/v1/schedulers" + regionLabelPrefix = "pd/api/v1/config/region-label/rule" maxMsgSize = int(128 * units.MiB) // pd.ScanRegion may return a large response scheduleConfigPrefix = "pd/api/v1/config/schedule" configPrefix = "pd/api/v1/config" @@ -94,6 +97,9 @@ var ( // see https://github.com/tikv/pd/pull/3088 pauseConfigVersion = semver.Version{Major: 4, Minor: 0, Patch: 8} + // After v6.1.0 version, we can pause schedulers by key range with TTL. + minVersionForRegionLabelTTL = semver.Version{Major: 6, Minor: 1, Patch: 0} + // Schedulers represent region/leader schedulers which can impact on performance. Schedulers = map[string]struct{}{ "balance-leader-scheduler": {}, @@ -130,9 +136,9 @@ var ( ) // pdHTTPRequest defines the interface to send a request to pd and return the result in bytes. -type pdHTTPRequest func(context.Context, string, string, *http.Client, string, io.Reader) ([]byte, error) +type pdHTTPRequest func(ctx context.Context, addr string, prefix string, cli *http.Client, method string, body io.Reader) ([]byte, error) -// pdRequest is a func to send a HTTP to pd and return the result bytes. +// pdRequest is a func to send an HTTP to pd and return the result bytes. func pdRequest( ctx context.Context, addr string, prefix string, @@ -709,6 +715,142 @@ func (p *PdController) doRemoveSchedulersWith( return removedSchedulers, err } +// RegionLabel is the label of a region. This struct is partially copied from +// https://github.com/tikv/pd/blob/783d060861cef37c38cbdcab9777fe95c17907fe/server/schedule/labeler/rules.go#L31. +type RegionLabel struct { + Key string `json:"key"` + Value string `json:"value"` + TTL string `json:"ttl,omitempty"` + StartAt string `json:"start_at,omitempty"` +} + +// LabelRule is the rule to assign labels to a region. This struct is partially copied from +// https://github.com/tikv/pd/blob/783d060861cef37c38cbdcab9777fe95c17907fe/server/schedule/labeler/rules.go#L41. +type LabelRule struct { + ID string `json:"id"` + Labels []RegionLabel `json:"labels"` + RuleType string `json:"rule_type"` + Data interface{} `json:"data"` +} + +// KeyRangeRule contains the start key and end key of the LabelRule. This struct is partially copied from +// https://github.com/tikv/pd/blob/783d060861cef37c38cbdcab9777fe95c17907fe/server/schedule/labeler/rules.go#L62. +type KeyRangeRule struct { + StartKeyHex string `json:"start_key"` // hex format start key, for marshal/unmarshal + EndKeyHex string `json:"end_key"` // hex format end key, for marshal/unmarshal +} + +// CreateOrUpdateRegionLabelRule creates or updates a region label rule. +func (p *PdController) CreateOrUpdateRegionLabelRule(ctx context.Context, rule LabelRule) error { + reqData, err := json.Marshal(&rule) + if err != nil { + panic(err) + } + var lastErr error + for i, addr := range p.addrs { + _, lastErr = pdRequest(ctx, addr, regionLabelPrefix, + p.cli, http.MethodPost, bytes.NewBuffer(reqData)) + if lastErr == nil { + return nil + } + if berrors.IsContextCanceled(lastErr) { + return errors.Trace(lastErr) + } + + if i < len(p.addrs) { + log.Warn("failed to create or update region label rule, will try next pd address", + zap.Error(lastErr), zap.String("pdAddr", addr)) + } + } + return errors.Trace(lastErr) +} + +func (p *PdController) DeleteRegionLabelRule(ctx context.Context, ruleID string) error { + var lastErr error + for i, addr := range p.addrs { + _, lastErr = pdRequest(ctx, addr, fmt.Sprintf("%s/%s", regionLabelPrefix, ruleID), + p.cli, http.MethodDelete, nil) + if lastErr == nil { + return nil + } + if berrors.IsContextCanceled(lastErr) { + return errors.Trace(lastErr) + } + + if i < len(p.addrs) { + log.Warn("failed to delete region label rule, will try next pd address", + zap.Error(lastErr), zap.String("pdAddr", addr)) + } + } + return errors.Trace(lastErr) +} + +// PauseSchedulersByKeyRange will pause schedulers for regions in the specific key range. +// This function will spawn a goroutine to keep pausing schedulers periodically until the context is done. +// The return done channel is used to notify the caller that the background goroutine is exited. +func (p *PdController) PauseSchedulersByKeyRange(ctx context.Context, startKey, endKey []byte) (done <-chan struct{}, err error) { + return p.pauseSchedulerByKeyRangeWithTTL(ctx, startKey, endKey, pauseTimeout) +} + +func (p *PdController) pauseSchedulerByKeyRangeWithTTL(ctx context.Context, startKey, endKey []byte, ttl time.Duration) (_done <-chan struct{}, err error) { + rule := LabelRule{ + ID: uuid.New().String(), + Labels: []RegionLabel{{ + Key: "schedule", + Value: "deny", + TTL: ttl.String(), + }}, + RuleType: "key-range", + // Data should be a list of KeyRangeRule when rule type is key-range. + // See https://github.com/tikv/pd/blob/783d060861cef37c38cbdcab9777fe95c17907fe/server/schedule/labeler/rules.go#L169. + Data: []KeyRangeRule{{ + StartKeyHex: hex.EncodeToString(startKey), + EndKeyHex: hex.EncodeToString(endKey), + }}, + } + done := make(chan struct{}) + if err := p.CreateOrUpdateRegionLabelRule(ctx, rule); err != nil { + close(done) + return nil, errors.Trace(err) + } + + go func() { + defer close(done) + ticker := time.NewTicker(ttl / 3) + defer ticker.Stop() + loop: + for { + select { + case <-ticker.C: + if err := p.CreateOrUpdateRegionLabelRule(ctx, rule); err != nil { + if berrors.IsContextCanceled(err) { + break loop + } + log.Warn("pause scheduler by key range failed, ignore it and wait next time pause", zap.Error(err)) + } + case <-ctx.Done(): + break loop + } + } + // Use a new context to avoid the context is canceled by the caller. + recoverCtx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + // Set ttl to 0 to remove the rule. + rule.Labels[0].TTL = time.Duration(0).String() + if err := p.DeleteRegionLabelRule(recoverCtx, rule.ID); err != nil { + log.Warn("failed to delete region label rule, the rule will be removed after ttl expires", + zap.String("rule-id", rule.ID), zap.Duration("ttl", ttl), zap.Error(err)) + } + }() + return done, nil +} + +// CanPauseSchedulerByKeyRange returns whether the scheduler can be paused by key range. +func (p *PdController) CanPauseSchedulerByKeyRange() bool { + // We need ttl feature to ensure scheduler can recover from pause automatically. + return p.version.Compare(minVersionForRegionLabelTTL) >= 0 +} + // Close close the connection to pd. func (p *PdController) Close() { p.pdClient.Close() diff --git a/br/pkg/pdutil/pd_serial_test.go b/br/pkg/pdutil/pd_serial_test.go index 05f0d34aa2ef2..608830fe190fe 100644 --- a/br/pkg/pdutil/pd_serial_test.go +++ b/br/pkg/pdutil/pd_serial_test.go @@ -12,7 +12,9 @@ import ( "net/http" "net/http/httptest" "net/url" + "strings" "testing" + "time" "github.com/coreos/go-semver/semver" "github.com/pingcap/failpoint" @@ -231,3 +233,47 @@ func TestStoreInfo(t *testing.T) { require.Equal(t, "Tombstone", resp.Store.StateName) require.Equal(t, uint64(1024), uint64(resp.Status.Available)) } + +func TestPauseSchedulersByKeyRange(t *testing.T) { + const ttl = time.Second + + labelExpires := make(map[string]time.Time) + + httpSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodDelete { + ruleID := strings.TrimPrefix(r.URL.Path, "/"+regionLabelPrefix+"/") + print(ruleID) + delete(labelExpires, ruleID) + return + } + var labelRule LabelRule + err := json.NewDecoder(r.Body).Decode(&labelRule) + require.NoError(t, err) + require.Len(t, labelRule.Labels, 1) + regionLabel := labelRule.Labels[0] + require.Equal(t, "schedule", regionLabel.Key) + require.Equal(t, "deny", regionLabel.Value) + reqTTL, err := time.ParseDuration(regionLabel.TTL) + require.NoError(t, err) + if reqTTL == 0 { + delete(labelExpires, labelRule.ID) + } else { + require.Equal(t, ttl, reqTTL) + if expire, ok := labelExpires[labelRule.ID]; ok { + require.True(t, expire.After(time.Now()), "should not expire before now") + } + labelExpires[labelRule.ID] = time.Now().Add(ttl) + } + })) + defer httpSrv.Close() + + pdController := &PdController{addrs: []string{httpSrv.URL}, cli: http.DefaultClient} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + done, err := pdController.pauseSchedulerByKeyRangeWithTTL(ctx, []byte{0, 0, 0, 0}, []byte{0xff, 0xff, 0xff, 0xff}, ttl) + require.NoError(t, err) + time.Sleep(ttl * 3) + cancel() + <-done + require.Len(t, labelExpires, 0) +} diff --git a/br/tests/lightning_disable_scheduler_by_key_range/config.toml b/br/tests/lightning_disable_scheduler_by_key_range/config.toml new file mode 100644 index 0000000000000..dce628f6a61c4 --- /dev/null +++ b/br/tests/lightning_disable_scheduler_by_key_range/config.toml @@ -0,0 +1,2 @@ +[lightning] +status-addr = ":8289" diff --git a/br/tests/lightning_disable_scheduler_by_key_range/data/test-schema-create.sql b/br/tests/lightning_disable_scheduler_by_key_range/data/test-schema-create.sql new file mode 100644 index 0000000000000..14379bd68472a --- /dev/null +++ b/br/tests/lightning_disable_scheduler_by_key_range/data/test-schema-create.sql @@ -0,0 +1 @@ +CREATE DATABASE test; diff --git a/br/tests/lightning_disable_scheduler_by_key_range/data/test.t-schema.sql b/br/tests/lightning_disable_scheduler_by_key_range/data/test.t-schema.sql new file mode 100644 index 0000000000000..57a1b65732950 --- /dev/null +++ b/br/tests/lightning_disable_scheduler_by_key_range/data/test.t-schema.sql @@ -0,0 +1 @@ +CREATE TABLE t(a INT PRIMARY KEY, b int); diff --git a/br/tests/lightning_disable_scheduler_by_key_range/data/test.t.sql b/br/tests/lightning_disable_scheduler_by_key_range/data/test.t.sql new file mode 100644 index 0000000000000..30e06b42e169b --- /dev/null +++ b/br/tests/lightning_disable_scheduler_by_key_range/data/test.t.sql @@ -0,0 +1 @@ +INSERT INTO t VALUES (1,1); diff --git a/br/tests/lightning_disable_scheduler_by_key_range/run.sh b/br/tests/lightning_disable_scheduler_by_key_range/run.sh new file mode 100644 index 0000000000000..9df6067c8baf8 --- /dev/null +++ b/br/tests/lightning_disable_scheduler_by_key_range/run.sh @@ -0,0 +1,65 @@ +#!/bin/bash +# +# Copyright 2022 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eux + +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/EnableTestAPI=return" +export GO_FAILPOINTS="${GO_FAILPOINTS};github.com/pingcap/tidb/br/pkg/lightning/backend/local/ReadyForImportEngine=sleep(10000)" + +run_lightning --backend='local' & +shpid="$!" +pid= + +ensure_lightning_is_started() { + for _ in {0..60}; do + pid=$(pstree -p "$shpid" | grep -Eo "tidb-lightning\.\([0-9]*\)" | grep -Eo "[0-9]*") || true + [ -n "$pid" ] && break + sleep 1 + done + if [ -z "$pid" ]; then + echo "lightning doesn't start successfully, please check the log" >&2 + exit 1 + fi + echo "lightning is started, pid is $pid" +} + +ready_for_import_engine() { + for _ in {0..60}; do + grep -Fq "start import engine" "$TEST_DIR"/lightning.log && return + sleep 1 + done + echo "lightning doesn't start import engine, please check the log" >&2 + exit 1 +} + +ensure_lightning_is_started +ready_for_import_engine + +run_curl "https://${PD_ADDR}/pd/api/v1/config/cluster-version" + +length=$(run_curl "https://${PD_ADDR}/pd/api/v1/config/region-label/rules" | jq 'select(.[].rule_type == "key-range") | length') +if [ "$length" != "1" ]; then + echo "region-label key-range rules should be 1, but got $length" >&2 + exit 1 +fi + +wait "$shpid" + +length=$(run_curl "https://${PD_ADDR}/pd/api/v1/config/region-label/rules" | jq 'select(.[].rule_type == "key-range") | length') +if [ -n "$length" ] && [ "$length" -ne 0 ]; then + echo "region-label key-range rules should be 0, but got $length" >&2 + exit 1 +fi diff --git a/br/tests/lightning_incremental/run.sh b/br/tests/lightning_incremental/run.sh index 4cdd5a53ec74b..a025e7bebc6f1 100644 --- a/br/tests/lightning_incremental/run.sh +++ b/br/tests/lightning_incremental/run.sh @@ -25,8 +25,6 @@ run_lightning_and_check_meta() { check_not_contains "Database: lightning_metadata" } -DB_NAME=incr - run_sql "DROP DATABASE IF EXISTS incr;" run_sql "DROP DATABASE IF EXISTS lightning_metadata;" run_lightning_and_check_meta @@ -48,8 +46,8 @@ for tbl in auto_random pk_auto_inc rowid_uk_inc uk_auto_inc; do done for tbl in pk_auto_inc rowid_uk_inc; do - run_sql "SELECT group_concat(v) from incr.$tbl group by 'all';" - check_contains "group_concat(v): a,b,c" + run_sql "SELECT group_concat(v order by v) as result from incr.$tbl group by 'all';" + check_contains "result: a,b,c" done run_sql "SELECT sum(pk) from incr.uk_auto_inc;" @@ -75,8 +73,8 @@ for tbl in auto_random pk_auto_inc rowid_uk_inc uk_auto_inc; do done for tbl in pk_auto_inc rowid_uk_inc; do - run_sql "SELECT group_concat(v) from incr.$tbl group by 'all';" - check_contains "group_concat(v): a,b,c,d,e,f" + run_sql "SELECT group_concat(v order by v) as result from incr.$tbl group by 'all';" + check_contains "result: a,b,c,d,e,f" done run_sql "SELECT sum(pk) from incr.uk_auto_inc;"