diff --git a/client/Makefile b/client/Makefile index b55b6fc8d9d..fd34ef157bf 100644 --- a/client/Makefile +++ b/client/Makefile @@ -19,21 +19,24 @@ SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash) default: static tidy test test: - CGO_ENABLE=1 go test -race -cover + CGO_ENABLE=1 go test ./... -race -cover basic-test: - CGO_ENABLE=1 go test + CGO_ENABLE=1 go test ./... ci-test-job: - CGO_ENABLED=1 go test -race -covermode=atomic -coverprofile=covprofile -coverpkg=../... github.com/tikv/pd/client + CGO_ENABLED=1 go test ./... -race -covermode=atomic -coverprofile=covprofile -coverpkg=../... github.com/tikv/pd/client install-tools: cd .. && $(MAKE) install-tools static: install-tools + @ echo "gofmt ..." @ gofmt -s -l -d . 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }' - @ golangci-lint run -c ../.golangci.yml ./... - @ revive -formatter friendly -config ../revive.toml . + @ echo "golangci-lint ..." + @ golangci-lint run -c ../.golangci.yml --verbose ./... + @ echo "revive ..." + @ revive -formatter friendly -config ../revive.toml ./... tidy: @ go mod tidy diff --git a/client/errs/errno.go b/client/errs/errno.go index 98fe9451d1e..646af81929d 100644 --- a/client/errs/errno.go +++ b/client/errs/errno.go @@ -32,7 +32,8 @@ const ( // NotServedErr indicates an tso node/pod received the requests for the keyspace groups which are not served by it. // Note: keep the same as the ones defined on the server side, because the client side checks if an error message // contains this string to judge whether the leader is changed. - NotServedErr = "is not served" + NotServedErr = "is not served" + // RetryTimeoutErr indicates the server is busy. RetryTimeoutErr = "retry timeout" ) @@ -91,6 +92,7 @@ var ( ErrClientResourceGroupThrottled = errors.Normalize("exceeded resource group quota limitation", errors.RFCCodeText("PD:client:ErrClientResourceGroupThrottled")) ) +// ErrClientGetResourceGroup is the error type for getting resource group. type ErrClientGetResourceGroup struct { ResourceGroupName string Cause string diff --git a/client/go.mod b/client/go.mod index b323ed33d5c..5281f99c0d8 100644 --- a/client/go.mod +++ b/client/go.mod @@ -8,7 +8,7 @@ require ( github.com/opentracing/opentracing-go v1.2.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2 + github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/prometheus/client_golang v1.11.1 github.com/stretchr/testify v1.8.2 diff --git a/client/go.sum b/client/go.sum index 0c7ea93ae95..3629d89f6a4 100644 --- a/client/go.sum +++ b/client/go.sum @@ -86,8 +86,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2 h1:VM6INL8StTPYMKufyHRX2hPUMP7isHnkYvtRMA7Sdsc= -github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1 h1:sC3XRNNBQNjFJGRtSzJRvqi2aDLFOsQoCHItr9rbbY8= +github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/client/keyspace_client.go b/client/keyspace_client.go index ec183e5659c..d9b9172dd69 100644 --- a/client/keyspace_client.go +++ b/client/keyspace_client.go @@ -21,19 +21,19 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/keyspacepb" - "github.com/pingcap/log" "github.com/tikv/pd/client/grpcutil" - "go.uber.org/zap" ) // KeyspaceClient manages keyspace metadata. type KeyspaceClient interface { // LoadKeyspace load and return target keyspace's metadata. LoadKeyspace(ctx context.Context, name string) (*keyspacepb.KeyspaceMeta, error) - // WatchKeyspaces watches keyspace meta changes. - WatchKeyspaces(ctx context.Context) (chan []*keyspacepb.KeyspaceMeta, error) // UpdateKeyspaceState updates target keyspace's state. UpdateKeyspaceState(ctx context.Context, id uint32, state keyspacepb.KeyspaceState) (*keyspacepb.KeyspaceMeta, error) + // WatchKeyspaces watches keyspace meta changes. + WatchKeyspaces(ctx context.Context) (chan []*keyspacepb.KeyspaceMeta, error) + // GetAllKeyspaces get all keyspace's metadata. + GetAllKeyspaces(ctx context.Context, startID uint32, limit uint32) ([]*keyspacepb.KeyspaceMeta, error) } // keyspaceClient returns the KeyspaceClient from current PD leader. @@ -75,44 +75,6 @@ func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.Key return resp.Keyspace, nil } -// WatchKeyspaces watches keyspace meta changes. -// It returns a stream of slices of keyspace metadata. -// The first message in stream contains all current keyspaceMeta, -// all subsequent messages contains new put events for all keyspaces. -func (c *client) WatchKeyspaces(ctx context.Context) (chan []*keyspacepb.KeyspaceMeta, error) { - keyspaceWatcherChan := make(chan []*keyspacepb.KeyspaceMeta) - req := &keyspacepb.WatchKeyspacesRequest{ - Header: c.requestHeader(), - } - stream, err := c.keyspaceClient().WatchKeyspaces(ctx, req) - if err != nil { - close(keyspaceWatcherChan) - return nil, err - } - go func() { - defer func() { - close(keyspaceWatcherChan) - if r := recover(); r != nil { - log.Error("[pd] panic in keyspace client `WatchKeyspaces`", zap.Any("error", r)) - return - } - }() - for { - select { - case <-ctx.Done(): - return - default: - resp, err := stream.Recv() - if err != nil { - return - } - keyspaceWatcherChan <- resp.Keyspaces - } - } - }() - return keyspaceWatcherChan, err -} - // UpdateKeyspaceState attempts to update the keyspace specified by ID to the target state, // it will also record StateChangedAt for the given keyspace if a state change took place. // Currently, legal operations includes: @@ -153,3 +115,43 @@ func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keysp return resp.Keyspace, nil } + +// WatchKeyspaces watches keyspace meta changes. +// It returns a stream of slices of keyspace metadata. +// The first message in stream contains all current keyspaceMeta, +// all subsequent messages contains new put events for all keyspaces. +func (c *client) WatchKeyspaces(ctx context.Context) (chan []*keyspacepb.KeyspaceMeta, error) { + return nil, errors.Errorf("WatchKeyspaces unimplemented") +} + +// GetAllKeyspaces get all keyspaces metadata. +func (c *client) GetAllKeyspaces(ctx context.Context, startID uint32, limit uint32) ([]*keyspacepb.KeyspaceMeta, error) { + if span := opentracing.SpanFromContext(ctx); span != nil { + span = opentracing.StartSpan("keyspaceClient.GetAllKeyspaces", opentracing.ChildOf(span.Context())) + defer span.Finish() + } + start := time.Now() + defer func() { cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds()) }() + ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + req := &keyspacepb.GetAllKeyspacesRequest{ + Header: c.requestHeader(), + StartId: startID, + Limit: limit, + } + ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) + resp, err := c.keyspaceClient().GetAllKeyspaces(ctx, req) + cancel() + + if err != nil { + cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds()) + c.pdSvcDiscovery.ScheduleCheckMemberChanged() + return nil, err + } + + if resp.Header.GetError() != nil { + cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds()) + return nil, errors.Errorf("Get all keyspaces metadata failed: %s", resp.Header.GetError().String()) + } + + return resp.Keyspaces, nil +} diff --git a/client/metrics.go b/client/metrics.go index e88711d1da9..1895306eca2 100644 --- a/client/metrics.go +++ b/client/metrics.go @@ -139,6 +139,7 @@ var ( cmdDurationSplitAndScatterRegions prometheus.Observer cmdDurationLoadKeyspace prometheus.Observer cmdDurationUpdateKeyspaceState prometheus.Observer + cmdDurationGetAllKeyspaces prometheus.Observer cmdDurationGet prometheus.Observer cmdDurationPut prometheus.Observer cmdDurationUpdateGCSafePointV2 prometheus.Observer @@ -184,6 +185,7 @@ func initCmdDurations() { cmdDurationSplitAndScatterRegions = cmdDuration.WithLabelValues("split_and_scatter_regions") cmdDurationLoadKeyspace = cmdDuration.WithLabelValues("load_keyspace") cmdDurationUpdateKeyspaceState = cmdDuration.WithLabelValues("update_keyspace_state") + cmdDurationGetAllKeyspaces = cmdDuration.WithLabelValues("get_all_keyspaces") cmdDurationGet = cmdDuration.WithLabelValues("get") cmdDurationPut = cmdDuration.WithLabelValues("put") cmdDurationUpdateGCSafePointV2 = cmdDuration.WithLabelValues("update_gc_safe_point_v2") diff --git a/client/resource_group/controller/config.go b/client/resource_group/controller/config.go index cbf0be54c8d..2095bc60601 100644 --- a/client/resource_group/controller/config.go +++ b/client/resource_group/controller/config.go @@ -77,8 +77,8 @@ const ( defaultAvgBatchProportion = 0.7 ) -// ControllerConfig is the configuration of the resource manager controller which includes some option for client needed. -type ControllerConfig struct { +// Config is the configuration of the resource manager controller which includes some option for client needed. +type Config struct { // EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect. DegradedModeWaitDuration string `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"` @@ -87,9 +87,9 @@ type ControllerConfig struct { RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"` } -// DefaultControllerConfig returns the default resource manager controller configuration. -func DefaultControllerConfig() *ControllerConfig { - return &ControllerConfig{ +// DefaultConfig returns the default resource manager controller configuration. +func DefaultConfig() *Config { + return &Config{ DegradedModeWaitDuration: defaultDegradedModeWaitDuration, RequestUnit: DefaultRequestUnitConfig(), } @@ -130,10 +130,10 @@ func DefaultRequestUnitConfig() RequestUnitConfig { } } -// Config is the configuration of the resource units, which gives the read/write request +// RUConfig is the configuration of the resource units, which gives the read/write request // units or request resource cost standards. It should be calculated by a given `RequestUnitConfig` // or `RequestResourceConfig`. -type Config struct { +type RUConfig struct { // RU model config ReadBaseCost RequestUnit ReadPerBatchBaseCost RequestUnit @@ -148,16 +148,16 @@ type Config struct { DegradedModeWaitDuration time.Duration } -// DefaultConfig returns the default configuration. -func DefaultConfig() *Config { - return GenerateConfig( - DefaultControllerConfig(), +// DefaultRUConfig returns the default configuration. +func DefaultRUConfig() *RUConfig { + return GenerateRUConfig( + DefaultConfig(), ) } -// GenerateConfig generates the configuration by the given request unit configuration. -func GenerateConfig(config *ControllerConfig) *Config { - cfg := &Config{ +// GenerateRUConfig generates the configuration by the given request unit configuration. +func GenerateRUConfig(config *Config) *RUConfig { + cfg := &RUConfig{ ReadBaseCost: RequestUnit(config.RequestUnit.ReadBaseCost), ReadPerBatchBaseCost: RequestUnit(config.RequestUnit.ReadPerBatchBaseCost), ReadBytesCost: RequestUnit(config.RequestUnit.ReadCostPerByte), diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index cc4595f7e42..7eb985af1ba 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -78,25 +78,25 @@ type ResourceControlCreateOption func(controller *ResourceGroupsController) // EnableSingleGroupByKeyspace is the option to enable single group by keyspace feature. func EnableSingleGroupByKeyspace() ResourceControlCreateOption { return func(controller *ResourceGroupsController) { - controller.config.isSingleGroupByKeyspace = true + controller.ruConfig.isSingleGroupByKeyspace = true } } // WithMaxWaitDuration is the option to set the max wait duration for acquiring token buckets. func WithMaxWaitDuration(d time.Duration) ResourceControlCreateOption { return func(controller *ResourceGroupsController) { - controller.config.maxWaitDuration = d + controller.ruConfig.maxWaitDuration = d } } var _ ResourceGroupKVInterceptor = (*ResourceGroupsController)(nil) -// ResourceGroupsController impls ResourceGroupKVInterceptor. +// ResourceGroupsController implements ResourceGroupKVInterceptor. type ResourceGroupsController struct { clientUniqueID uint64 provider ResourceGroupProvider groupsController sync.Map - config *Config + ruConfig *RUConfig loopCtx context.Context loopCancel func() @@ -128,19 +128,19 @@ func NewResourceGroupController( requestUnitConfig *RequestUnitConfig, opts ...ResourceControlCreateOption, ) (*ResourceGroupsController, error) { - controllerConfig, err := loadServerConfig(ctx, provider) + config, err := loadServerConfig(ctx, provider) if err != nil { return nil, err } if requestUnitConfig != nil { - controllerConfig.RequestUnit = *requestUnitConfig + config.RequestUnit = *requestUnitConfig } - log.Info("load resource controller config", zap.Reflect("config", controllerConfig)) - config := GenerateConfig(controllerConfig) + log.Info("load resource controller config", zap.Reflect("config", config)) + ruConfig := GenerateRUConfig(config) controller := &ResourceGroupsController{ clientUniqueID: clientUniqueID, provider: provider, - config: config, + ruConfig: ruConfig, lowTokenNotifyChan: make(chan struct{}, 1), tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1), tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen), @@ -148,30 +148,30 @@ func NewResourceGroupController( for _, opt := range opts { opt(controller) } - controller.calculators = []ResourceCalculator{newKVCalculator(controller.config), newSQLCalculator(controller.config)} + controller.calculators = []ResourceCalculator{newKVCalculator(controller.ruConfig), newSQLCalculator(controller.ruConfig)} return controller, nil } -func loadServerConfig(ctx context.Context, provider ResourceGroupProvider) (*ControllerConfig, error) { +func loadServerConfig(ctx context.Context, provider ResourceGroupProvider) (*Config, error) { items, _, err := provider.LoadGlobalConfig(ctx, nil, controllerConfigPath) if err != nil { return nil, err } if len(items) == 0 { log.Warn("[resource group controller] server does not save config, load config failed") - return DefaultControllerConfig(), nil + return DefaultConfig(), nil } - controllerConfig := &ControllerConfig{} - err = json.Unmarshal(items[0].PayLoad, controllerConfig) + config := &Config{} + err = json.Unmarshal(items[0].PayLoad, config) if err != nil { return nil, err } - return controllerConfig, nil + return config, nil } // GetConfig returns the config of controller. It's only used for test. -func (c *ResourceGroupsController) GetConfig() *Config { - return c.config +func (c *ResourceGroupsController) GetConfig() *RUConfig { + return c.ruConfig } // Source List @@ -184,8 +184,8 @@ const ( func (c *ResourceGroupsController) Start(ctx context.Context) { c.loopCtx, c.loopCancel = context.WithCancel(ctx) go func() { - if c.config.DegradedModeWaitDuration > 0 { - c.run.responseDeadline = time.NewTimer(c.config.DegradedModeWaitDuration) + if c.ruConfig.DegradedModeWaitDuration > 0 { + c.run.responseDeadline = time.NewTimer(c.ruConfig.DegradedModeWaitDuration) c.run.responseDeadline.Stop() defer c.run.responseDeadline.Stop() } @@ -214,11 +214,11 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { log.Warn("load resource group revision failed", zap.Error(err)) } var watchChannel chan []*meta_storagepb.Event - if !c.config.isSingleGroupByKeyspace { + if !c.ruConfig.isSingleGroupByKeyspace { watchChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(revision), pd.WithPrefix()) } watchRetryTimer := time.NewTimer(watchRetryInterval) - if err == nil || c.config.isSingleGroupByKeyspace { + if err == nil || c.ruConfig.isSingleGroupByKeyspace { watchRetryTimer.Stop() } defer watchRetryTimer.Stop() @@ -259,7 +259,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition) case resp, ok := <-watchChannel: failpoint.Inject("disableWatch", func() { - if c.config.isSingleGroupByKeyspace { + if c.ruConfig.isSingleGroupByKeyspace { panic("disableWatch") } }) @@ -335,7 +335,7 @@ func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name return gc, nil } // Initialize the resource group controller. - gc, err := newGroupCostController(group, c.config, c.lowTokenNotifyChan, c.tokenBucketUpdateChan) + gc, err := newGroupCostController(group, c.ruConfig, c.lowTokenNotifyChan, c.tokenBucketUpdateChan) if err != nil { return nil, err } @@ -425,8 +425,8 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, TargetRequestPeriodMs: uint64(defaultTargetPeriod / time.Millisecond), ClientUniqueId: c.clientUniqueID, } - if c.config.DegradedModeWaitDuration > 0 && c.responseDeadlineCh == nil { - c.run.responseDeadline.Reset(c.config.DegradedModeWaitDuration) + if c.ruConfig.DegradedModeWaitDuration > 0 && c.responseDeadlineCh == nil { + c.run.responseDeadline.Reset(c.ruConfig.DegradedModeWaitDuration) c.responseDeadlineCh = c.run.responseDeadline.C } go func() { @@ -485,7 +485,7 @@ type groupCostController struct { // invariant attributes name string mode rmpb.GroupMode - mainCfg *Config + mainCfg *RUConfig // meta info meta *rmpb.ResourceGroup metaLock sync.RWMutex @@ -574,7 +574,7 @@ type tokenCounter struct { func newGroupCostController( group *rmpb.ResourceGroup, - mainCfg *Config, + mainCfg *RUConfig, lowRUNotifyChan chan struct{}, tokenBucketUpdateChan chan *groupCostController, ) (*groupCostController, error) { @@ -1118,9 +1118,8 @@ func (gc *groupCostController) onRequestWait( sub(gc.mu.consumption, delta) gc.mu.Unlock() return nil, nil, err - } else { - gc.successfulRequestDuration.Observe(d.Seconds()) } + gc.successfulRequestDuration.Observe(d.Seconds()) } gc.mu.Lock() diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index d3b2a29c211..165d501ddb1 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -42,7 +42,7 @@ func createTestGroupCostController(re *require.Assertions) *groupCostController } ch1 := make(chan struct{}) ch2 := make(chan *groupCostController) - gc, err := newGroupCostController(group, DefaultConfig(), ch1, ch2) + gc, err := newGroupCostController(group, DefaultRUConfig(), ch1, ch2) re.NoError(err) return gc } diff --git a/client/resource_group/controller/limiter.go b/client/resource_group/controller/limiter.go index a8038c6d8e5..f89ab17514c 100644 --- a/client/resource_group/controller/limiter.go +++ b/client/resource_group/controller/limiter.go @@ -285,6 +285,7 @@ type tokenBucketReconfigureArgs struct { NotifyThreshold float64 } +// LimiterOption configures Limiter. type LimiterOption func(*Limiter) func resetLowProcess() func(*Limiter) { @@ -386,6 +387,7 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur return r } +// ResetRemainingNotifyTimes resets the remaining notify times to 3. func (lim *Limiter) ResetRemainingNotifyTimes() { lim.mu.Lock() defer lim.mu.Unlock() diff --git a/client/resource_group/controller/limiter_test.go b/client/resource_group/controller/limiter_test.go index 2e05f66ab67..b8b96ae13d6 100644 --- a/client/resource_group/controller/limiter_test.go +++ b/client/resource_group/controller/limiter_test.go @@ -109,11 +109,19 @@ func TestReconfig(t *testing.T) { args := tokenBucketReconfigureArgs{ NewTokens: 6., NewRate: 2, - NewBurst: -1, } lim.Reconfigure(t1, args) checkTokens(re, lim, t1, 5) checkTokens(re, lim, t2, 7) + + args = tokenBucketReconfigureArgs{ + NewTokens: 6., + NewRate: 2, + NewBurst: -1, + } + lim.Reconfigure(t1, args) + checkTokens(re, lim, t1, 6) + checkTokens(re, lim, t2, 6) re.Equal(int64(-1), lim.GetBurst()) } diff --git a/client/resource_group/controller/model.go b/client/resource_group/controller/model.go index 55a6f9ec939..81f7e52716e 100644 --- a/client/resource_group/controller/model.go +++ b/client/resource_group/controller/model.go @@ -65,13 +65,13 @@ type ResourceCalculator interface { // KVCalculator is used to calculate the KV-side consumption. type KVCalculator struct { - *Config + *RUConfig } var _ ResourceCalculator = (*KVCalculator)(nil) -func newKVCalculator(cfg *Config) *KVCalculator { - return &KVCalculator{Config: cfg} +func newKVCalculator(cfg *RUConfig) *KVCalculator { + return &KVCalculator{RUConfig: cfg} } // Trickle ... @@ -146,13 +146,13 @@ func (kc *KVCalculator) payBackWriteCost(consumption *rmpb.Consumption, req Requ // SQLCalculator is used to calculate the SQL-side consumption. type SQLCalculator struct { - *Config + *RUConfig } var _ ResourceCalculator = (*SQLCalculator)(nil) -func newSQLCalculator(cfg *Config) *SQLCalculator { - return &SQLCalculator{Config: cfg} +func newSQLCalculator(cfg *RUConfig) *SQLCalculator { + return &SQLCalculator{RUConfig: cfg} } // Trickle update sql layer CPU consumption. diff --git a/client/resource_group/controller/model_test.go b/client/resource_group/controller/model_test.go index 94007485520..594091da364 100644 --- a/client/resource_group/controller/model_test.go +++ b/client/resource_group/controller/model_test.go @@ -37,14 +37,6 @@ func TestGetRUValueFromConsumption(t *testing.T) { result = getRUValueFromConsumption(custom, typ) re.Equal(expected, result) - - // When typ is not RU - custom = &rmpb.Consumption{RRU: 2.5, WRU: 3.5} - typ = rmpb.RequestUnitType_RU - expected = float64(0) - - result = getRUValueFromConsumption(custom, typ) - re.Equal(expected, result) } func TestGetRUTokenBucketSetting(t *testing.T) { @@ -69,18 +61,6 @@ func TestGetRUTokenBucketSetting(t *testing.T) { if result != expected { t.Errorf("Expected nil but got %v", result) } - - // When typ is not RU - group = &rmpb.ResourceGroup{ - RUSettings: &rmpb.GroupRequestUnitSettings{ - RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 100}}, - }, - } - typ = rmpb.RequestUnitType_RU - expected = nil - - result = getRUTokenBucketSetting(group, typ) - re.Equal(expected, result) } func TestGetRawResourceValueFromConsumption(t *testing.T) { diff --git a/client/resource_group/controller/testutil.go b/client/resource_group/controller/testutil.go index 81dc8a154b0..4df8c9bba0d 100644 --- a/client/resource_group/controller/testutil.go +++ b/client/resource_group/controller/testutil.go @@ -51,7 +51,7 @@ func (tri *TestRequestInfo) StoreID() uint64 { return tri.storeID } -// ReplicaNums implements the RequestInfo interface. +// ReplicaNumber implements the RequestInfo interface. func (tri *TestRequestInfo) ReplicaNumber() int64 { return 1 } @@ -63,6 +63,7 @@ type TestResponseInfo struct { succeed bool } +// NewTestResponseInfo creates a new TestResponseInfo. func NewTestResponseInfo(readBytes uint64, kvCPU time.Duration, succeed bool) *TestResponseInfo { return &TestResponseInfo{ readBytes: readBytes, diff --git a/client/resource_manager_client.go b/client/resource_manager_client.go index 4b9896dfefa..4544c288896 100644 --- a/client/resource_manager_client.go +++ b/client/resource_manager_client.go @@ -36,6 +36,8 @@ const ( groupSettingsPathPrefix = "resource_group/settings" // errNotPrimary is returned when the requested server is not primary. errNotPrimary = "not primary" + // errNotLeader is returned when the requested server is not pd leader. + errNotLeader = "not leader" ) // GroupSettingsPathPrefixBytes is used to watch or get resource groups. @@ -65,7 +67,7 @@ func (c *client) resourceManagerClient() (rmpb.ResourceManagerClient, error) { // gRPCErrorHandler is used to handle the gRPC error returned by the resource manager service. func (c *client) gRPCErrorHandler(err error) { - if strings.Contains(err.Error(), errNotPrimary) { + if strings.Contains(err.Error(), errNotPrimary) || strings.Contains(err.Error(), errNotLeader) { c.pdSvcDiscovery.ScheduleCheckMemberChanged() } } diff --git a/cmd/pd-server/main.go b/cmd/pd-server/main.go index 821f88a76b9..d21f1588bea 100644 --- a/cmd/pd-server/main.go +++ b/cmd/pd-server/main.go @@ -212,7 +212,10 @@ func start(cmd *cobra.Command, args []string, services ...string) { // Creates server. ctx, cancel := context.WithCancel(context.Background()) - serviceBuilders := []server.HandlerBuilder{api.NewHandler, apiv2.NewV2Handler, swaggerserver.NewHandler, autoscaling.NewHandler} + serviceBuilders := []server.HandlerBuilder{api.NewHandler, apiv2.NewV2Handler, autoscaling.NewHandler} + if swaggerserver.Enabled() { + serviceBuilders = append(serviceBuilders, swaggerserver.NewHandler) + } serviceBuilders = append(serviceBuilders, dashboard.GetServiceBuilders()...) svr, err := server.CreateServer(ctx, cfg, services, serviceBuilders...) if err != nil { diff --git a/go.mod b/go.mod index 61720604c3c..b112556085a 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2 + github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27 diff --git a/go.sum b/go.sum index 0e9c275d18b..cda1439fcec 100644 --- a/go.sum +++ b/go.sum @@ -424,8 +424,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2 h1:VM6INL8StTPYMKufyHRX2hPUMP7isHnkYvtRMA7Sdsc= -github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1 h1:sC3XRNNBQNjFJGRtSzJRvqi2aDLFOsQoCHItr9rbbY8= +github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index 3909dc670c6..f07f4a30584 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -5136,6 +5136,111 @@ "alignLevel": null } }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "decimals": 2, + "description": "The total keys of hot write on leader Regions for each TiKV instance", + "fill": 0, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 31 + }, + "id": 1465, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "hideEmpty": true, + "hideZero": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "sort": "current", + "sortDesc": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 2, + "links": [], + "nullPointMode": "null", + "paceLength": 10, + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "pd_hotspot_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\", instance=\"$instance\", type=\"total_write_bytes_as_leader\"}", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{address}}-store-{{store}}", + "metric": "pd_hotspot_status", + "refId": "A", + "step": 4 + }, + { + "exemplar": true, + "expr": "pd_scheduler_hot_peers_summary{type=\"exp-byte-rate-write-leader\"}", + "hide": true, + "interval": "", + "legendFormat": "exp-byte-rate-write-leader-{{store}}", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Total written bytes on hot leader Regions", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": null, + "format": "ops", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, { "aliasColors": {}, "bars": false, @@ -5260,7 +5365,7 @@ "h": 7, "w": 12, "x": 12, - "y": 31 + "y": 38 }, "id": 1445, "legend": { @@ -5454,12 +5559,13 @@ "dashes": false, "datasource": "${DS_TEST-CLUSTER}", "decimals": 0, + "description": "The select events of hot read scheduler", "fill": 0, "gridPos": { "h": 7, "w": 12, "x": 12, - "y": 38 + "y": 45 }, "id": 106, "legend": { @@ -5492,7 +5598,7 @@ "steppedLine": false, "targets": [ { - "expr": "rate(pd_scheduler_hot_region{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\", type=~\".*store.*\"}[1m])", + "expr": "rate(pd_scheduler_hot_region{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\", type=~\".*write.*\"}[1m])", "format": "time_series", "intervalFactor": 2, "legendFormat": "{{type}}-{{store}}", @@ -5504,7 +5610,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Selector events", + "title": "Selector write events", "tooltip": { "shared": true, "sort": 0, @@ -5656,7 +5762,7 @@ "h": 7, "w": 12, "x": 12, - "y": 45 + "y": 52 }, "id": 148, "legend": { @@ -6737,6 +6843,102 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "decimals": 0, + "description": "The select events of hot read scheduler", + "fill": 0, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 81 + }, + "id": 1466, + "legend": { + "alignAsTable": true, + "avg": true, + "current": true, + "hideEmpty": true, + "hideZero": true, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "sort": "current", + "sortDesc": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "paceLength": 10, + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "rate(pd_scheduler_hot_region{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\", type=~\".*read.*\"}[1m])", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{type}}-{{store}}", + "refId": "A", + "step": 4 + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Selector read events", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": null, + "format": "none", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "repeat": null, @@ -11964,7 +12166,7 @@ "x": 12, "y": 48 }, - "id": 1455, + "id": 1467, "interval": "", "options": { "displayMode": "lcd", diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index ad61fe0a3fa..3d208aef2a2 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -48,6 +48,8 @@ const ( // UserKindKey is the key for user kind in keyspace config. UserKindKey = "user_kind" // TSOKeyspaceGroupIDKey is the key for tso keyspace group id in keyspace config. + // Note: Config[TSOKeyspaceGroupIDKey] is only used to judge whether there is keyspace group id. + // It will not update the keyspace group id when merging or splitting. TSOKeyspaceGroupIDKey = "tso_keyspace_group_id" // MaxEtcdTxnOps is the max value of operations in an etcd txn. The default limit of etcd txn op is 128. // We use 120 here to leave some space for other operations. @@ -783,7 +785,9 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID if err != nil { return err } + manager.kgm.Lock() manager.kgm.groups[endpoint.StringUserKind(defaultKeyspaceGroup.UserKind)].Put(defaultKeyspaceGroup) + manager.kgm.Unlock() // If all keyspaces in the current batch are assigned, update the next start ID. manager.nextPatrolStartID = nextStartID } diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index 88d478fd50f..084380549fc 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -388,6 +388,20 @@ func (m *GroupManager) getKeyspaceConfigByKindLocked(userKind endpoint.UserKind) return config, nil } +// GetGroupByKeyspaceID returns the keyspace group ID for the given keyspace ID. +func (m *GroupManager) GetGroupByKeyspaceID(id uint32) (uint32, error) { + m.RLock() + defer m.RUnlock() + for _, groups := range m.groups { + for _, group := range groups.GetAll() { + if slice.Contains(group.Keyspaces, id) { + return group.ID, nil + } + } + } + return 0, ErrKeyspaceNotInAnyKeyspaceGroup +} + var failpointOnce sync.Once // UpdateKeyspaceForGroup updates the keyspace field for the keyspace group. diff --git a/pkg/keyspace/util.go b/pkg/keyspace/util.go index 100b0eb6986..2923dc7053f 100644 --- a/pkg/keyspace/util.go +++ b/pkg/keyspace/util.go @@ -70,6 +70,8 @@ var ( } // ErrKeyspaceNotInKeyspaceGroup is used to indicate target keyspace is not in this keyspace group. ErrKeyspaceNotInKeyspaceGroup = errors.New("keyspace is not in this keyspace group") + // ErrKeyspaceNotInAnyKeyspaceGroup is used to indicate target keyspace is not in any keyspace group. + ErrKeyspaceNotInAnyKeyspaceGroup = errors.New("keyspace is not in any keyspace group") // ErrNodeNotInKeyspaceGroup is used to indicate the tso node is not in this keyspace group. ErrNodeNotInKeyspaceGroup = errors.New("the tso node is not in this keyspace group") // ErrKeyspaceGroupNotEnoughReplicas is used to indicate not enough replicas in the keyspace group. diff --git a/pkg/mcs/discovery/discover.go b/pkg/mcs/discovery/discover.go index 6d939fde540..00e168114b0 100644 --- a/pkg/mcs/discovery/discover.go +++ b/pkg/mcs/discovery/discover.go @@ -21,7 +21,7 @@ import ( // Discover is used to get all the service instances of the specified service name. func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, error) { - key := ServicePath(clusterID, serviceName) + "/" + key := ServicePath(clusterID, serviceName) endKey := clientv3.GetPrefixRangeEnd(key) withRange := clientv3.WithRange(endKey) diff --git a/pkg/mcs/discovery/key_path.go b/pkg/mcs/discovery/key_path.go index d77aa98f3a9..b7bf9d1cac3 100644 --- a/pkg/mcs/discovery/key_path.go +++ b/pkg/mcs/discovery/key_path.go @@ -32,10 +32,10 @@ func RegistryPath(clusterID, serviceName, serviceAddr string) string { // ServicePath returns the path to store microservice addresses. func ServicePath(clusterID, serviceName string) string { - return strings.Join([]string{utils.MicroserviceRootPath, clusterID, serviceName, registryKey}, "/") + return strings.Join([]string{utils.MicroserviceRootPath, clusterID, serviceName, registryKey, ""}, "/") } // TSOPath returns the path to store TSO addresses. func TSOPath(clusterID uint64) string { - return ServicePath(strconv.FormatUint(clusterID, 10), "tso") + "/" + return ServicePath(strconv.FormatUint(clusterID, 10), "tso") } diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index b054a37e0ac..321012bb7d4 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -127,23 +127,26 @@ func (m *Manager) Init(ctx context.Context) { } m.storage.LoadResourceGroupStates(tokenHandler) - // Add default group - defaultGroup := &ResourceGroup{ - Name: reservedDefaultGroupName, - Mode: rmpb.GroupMode_RUMode, - RUSettings: &RequestUnitSettings{ - RU: &GroupTokenBucket{ - Settings: &rmpb.TokenLimitSettings{ - FillRate: math.MaxInt32, - BurstLimit: -1, + // Add default group if it's not inited. + if _, ok := m.groups[reservedDefaultGroupName]; !ok { + defaultGroup := &ResourceGroup{ + Name: reservedDefaultGroupName, + Mode: rmpb.GroupMode_RUMode, + RUSettings: &RequestUnitSettings{ + RU: &GroupTokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: math.MaxInt32, + BurstLimit: -1, + }, }, }, - }, - Priority: middlePriority, - } - if err := m.AddResourceGroup(defaultGroup.IntoProtoResourceGroup()); err != nil { - log.Warn("init default group failed", zap.Error(err)) + Priority: middlePriority, + } + if err := m.AddResourceGroup(defaultGroup.IntoProtoResourceGroup()); err != nil { + log.Warn("init default group failed", zap.Error(err)) + } } + // Start the background metrics flusher. go m.backgroundMetricsFlush(ctx) go func() { diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go new file mode 100644 index 00000000000..26eab6d9424 --- /dev/null +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -0,0 +1,94 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package apis + +import ( + "net/http" + "sync" + + "github.com/gin-contrib/cors" + "github.com/gin-contrib/gzip" + "github.com/gin-contrib/pprof" + "github.com/gin-gonic/gin" + "github.com/joho/godotenv" + scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server" + "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" + "github.com/unrolled/render" +) + +// APIPathPrefix is the prefix of the API path. +const APIPathPrefix = "/scheduling/api/v1/" + +var ( + once sync.Once + apiServiceGroup = apiutil.APIServiceGroup{ + Name: "scheduling", + Version: "v1", + IsCore: false, + PathPrefix: APIPathPrefix, + } +) + +func init() { + scheserver.SetUpRestHandler = func(srv *scheserver.Service) (http.Handler, apiutil.APIServiceGroup) { + s := NewService(srv) + return s.apiHandlerEngine, apiServiceGroup + } +} + +// Service is the tso service. +type Service struct { + apiHandlerEngine *gin.Engine + root *gin.RouterGroup + + srv *scheserver.Service + rd *render.Render +} + +func createIndentRender() *render.Render { + return render.New(render.Options{ + IndentJSON: true, + }) +} + +// NewService returns a new Service. +func NewService(srv *scheserver.Service) *Service { + once.Do(func() { + // These global modification will be effective only for the first invoke. + _ = godotenv.Load() + gin.SetMode(gin.ReleaseMode) + }) + apiHandlerEngine := gin.New() + apiHandlerEngine.Use(gin.Recovery()) + apiHandlerEngine.Use(cors.Default()) + apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression)) + apiHandlerEngine.Use(func(c *gin.Context) { + c.Set(multiservicesapi.ServiceContextKey, srv) + c.Next() + }) + apiHandlerEngine.Use(multiservicesapi.ServiceRedirector()) + apiHandlerEngine.GET("metrics", utils.PromHandler()) + pprof.Register(apiHandlerEngine) + root := apiHandlerEngine.Group(APIPathPrefix) + s := &Service{ + srv: srv, + apiHandlerEngine: apiHandlerEngine, + root: root, + rd: createIndentRender(), + } + return s +} diff --git a/pkg/mcs/scheduling/server/config.go b/pkg/mcs/scheduling/server/config.go new file mode 100644 index 00000000000..be628f4519d --- /dev/null +++ b/pkg/mcs/scheduling/server/config.go @@ -0,0 +1,172 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/BurntSushi/toml" + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/spf13/pflag" + "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/utils/configutil" + "github.com/tikv/pd/pkg/utils/grpcutil" + "github.com/tikv/pd/pkg/utils/metricutil" + "go.uber.org/zap" +) + +const ( + defaultName = "Scheduling" + defaultBackendEndpoints = "http://127.0.0.1:2379" + defaultListenAddr = "http://127.0.0.1:3379" +) + +// Config is the configuration for the scheduling. +type Config struct { + BackendEndpoints string `toml:"backend-endpoints" json:"backend-endpoints"` + ListenAddr string `toml:"listen-addr" json:"listen-addr"` + AdvertiseListenAddr string `toml:"advertise-listen-addr" json:"advertise-listen-addr"` + Name string `toml:"name" json:"name"` + DataDir string `toml:"data-dir" json:"data-dir"` // TODO: remove this after refactoring + EnableGRPCGateway bool `json:"enable-grpc-gateway"` // TODO: use it + + Metric metricutil.MetricConfig `toml:"metric" json:"metric"` + + // Log related config. + Log log.Config `toml:"log" json:"log"` + Logger *zap.Logger + LogProps *log.ZapProperties + + Security configutil.SecurityConfig `toml:"security" json:"security"` + + // WarningMsgs contains all warnings during parsing. + WarningMsgs []string + + // LeaderLease defines the time within which a Scheduling primary/leader must + // update its TTL in etcd, otherwise etcd will expire the leader key and other servers + // can campaign the primary/leader again. Etcd only supports seconds TTL, so here is + // second too. + LeaderLease int64 `toml:"lease" json:"lease"` +} + +// NewConfig creates a new config. +func NewConfig() *Config { + return &Config{} +} + +// Parse parses flag definitions from the argument list. +func (c *Config) Parse(flagSet *pflag.FlagSet) error { + // Load config file if specified. + var ( + meta *toml.MetaData + err error + ) + if configFile, _ := flagSet.GetString("config"); configFile != "" { + meta, err = configutil.ConfigFromFile(c, configFile) + if err != nil { + return err + } + } + + // Ignore the error check here + configutil.AdjustCommandlineString(flagSet, &c.Log.Level, "log-level") + configutil.AdjustCommandlineString(flagSet, &c.Log.File.Filename, "log-file") + configutil.AdjustCommandlineString(flagSet, &c.Metric.PushAddress, "metrics-addr") + configutil.AdjustCommandlineString(flagSet, &c.Security.CAPath, "cacert") + configutil.AdjustCommandlineString(flagSet, &c.Security.CertPath, "cert") + configutil.AdjustCommandlineString(flagSet, &c.Security.KeyPath, "key") + configutil.AdjustCommandlineString(flagSet, &c.BackendEndpoints, "backend-endpoints") + configutil.AdjustCommandlineString(flagSet, &c.ListenAddr, "listen-addr") + configutil.AdjustCommandlineString(flagSet, &c.AdvertiseListenAddr, "advertise-listen-addr") + + return c.Adjust(meta, false) +} + +// Adjust is used to adjust the scheduling configurations. +func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error { + configMetaData := configutil.NewConfigMetadata(meta) + if err := configMetaData.CheckUndecoded(); err != nil { + c.WarningMsgs = append(c.WarningMsgs, err.Error()) + } + + if c.Name == "" { + hostname, err := os.Hostname() + if err != nil { + return err + } + configutil.AdjustString(&c.Name, fmt.Sprintf("%s-%s", defaultName, hostname)) + } + configutil.AdjustString(&c.DataDir, fmt.Sprintf("default.%s", c.Name)) + configutil.AdjustPath(&c.DataDir) + + if err := c.Validate(); err != nil { + return err + } + + configutil.AdjustString(&c.BackendEndpoints, defaultBackendEndpoints) + configutil.AdjustString(&c.ListenAddr, defaultListenAddr) + configutil.AdjustString(&c.AdvertiseListenAddr, c.ListenAddr) + + if !configMetaData.IsDefined("enable-grpc-gateway") { + c.EnableGRPCGateway = utils.DefaultEnableGRPCGateway + } + + c.adjustLog(configMetaData.Child("log")) + c.Security.Encryption.Adjust() + + if len(c.Log.Format) == 0 { + c.Log.Format = utils.DefaultLogFormat + } + + configutil.AdjustInt64(&c.LeaderLease, utils.DefaultLeaderLease) + + return nil +} + +func (c *Config) adjustLog(meta *configutil.ConfigMetaData) { + if !meta.IsDefined("disable-error-verbose") { + c.Log.DisableErrorVerbose = utils.DefaultDisableErrorVerbose + } +} + +// GetTLSConfig returns the TLS config. +func (c *Config) GetTLSConfig() *grpcutil.TLSConfig { + return &c.Security.TLSConfig +} + +// Validate is used to validate if some configurations are right. +func (c *Config) Validate() error { + dataDir, err := filepath.Abs(c.DataDir) + if err != nil { + return errors.WithStack(err) + } + logFile, err := filepath.Abs(c.Log.File.Filename) + if err != nil { + return errors.WithStack(err) + } + rel, err := filepath.Rel(dataDir, filepath.Dir(logFile)) + if err != nil { + return errors.WithStack(err) + } + if !strings.HasPrefix(rel, "..") { + return errors.New("log directory shouldn't be the subdirectory of data directory") + } + + return nil +} diff --git a/pkg/mcs/scheduling/server/grpc_service.go b/pkg/mcs/scheduling/server/grpc_service.go new file mode 100644 index 00000000000..3b0e51f1f66 --- /dev/null +++ b/pkg/mcs/scheduling/server/grpc_service.go @@ -0,0 +1,71 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "net/http" + + "github.com/pingcap/log" + bs "github.com/tikv/pd/pkg/basicserver" + "github.com/tikv/pd/pkg/mcs/registry" + "github.com/tikv/pd/pkg/utils/apiutil" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// gRPC errors +var ( + ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") + ErrClusterMismatched = status.Errorf(codes.Unavailable, "cluster mismatched") +) + +// SetUpRestHandler is a hook to sets up the REST service. +var SetUpRestHandler = func(srv *Service) (http.Handler, apiutil.APIServiceGroup) { + return dummyRestService{}, apiutil.APIServiceGroup{} +} + +type dummyRestService struct{} + +func (d dummyRestService) ServeHTTP(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotImplemented) + w.Write([]byte("not implemented")) +} + +// Service is the scheduling grpc service. +type Service struct { + *Server +} + +// NewService creates a new TSO service. +func NewService(svr bs.Server) registry.RegistrableService { + server, ok := svr.(*Server) + if !ok { + log.Fatal("create scheduling server failed") + } + return &Service{ + Server: server, + } +} + +// RegisterGRPCService registers the service to gRPC server. +func (s *Service) RegisterGRPCService(g *grpc.Server) { +} + +// RegisterRESTHandler registers the service to REST server. +func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) { + handler, group := SetUpRestHandler(s) + apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) +} diff --git a/pkg/swaggerserver/swagger_handler.go b/pkg/mcs/scheduling/server/metrics.go similarity index 51% rename from pkg/swaggerserver/swagger_handler.go rename to pkg/mcs/scheduling/server/metrics.go index 69cff3d2751..b3f5b7b41de 100644 --- a/pkg/swaggerserver/swagger_handler.go +++ b/pkg/mcs/scheduling/server/metrics.go @@ -1,4 +1,4 @@ -// Copyright 2020 TiKV Project Authors. +// Copyright 2023 TiKV Project Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,18 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build swagger_server -// +build swagger_server +package server -package swaggerserver +import "github.com/prometheus/client_golang/prometheus" -import ( - "net/http" +const ( + namespace = "scheduling" + serverSubsystem = "server" +) - httpSwagger "github.com/swaggo/http-swagger" - _ "github.com/tikv/pd/docs/swagger" +var ( + // Meta & Server info. + serverInfo = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: serverSubsystem, + Name: "info", + Help: "Indicate the scheduling server info, and the value is the start timestamp (s).", + }, []string{"version", "hash"}) ) -func handler() http.Handler { - return httpSwagger.Handler() +func init() { + prometheus.MustRegister(serverInfo) } diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go new file mode 100644 index 00000000000..7b564e3433e --- /dev/null +++ b/pkg/mcs/scheduling/server/server.go @@ -0,0 +1,500 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "context" + "crypto/tls" + "fmt" + "net" + "net/http" + "net/url" + "os" + "os/signal" + "path" + "strconv" + "strings" + "sync" + "sync/atomic" + "syscall" + "time" + + grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/pingcap/kvproto/pkg/diagnosticspb" + "github.com/pingcap/log" + "github.com/pingcap/sysutil" + "github.com/soheilhy/cmux" + "github.com/spf13/cobra" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/discovery" + "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/member" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/memberutil" + "github.com/tikv/pd/pkg/utils/metricutil" + "github.com/tikv/pd/pkg/versioninfo" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/pkg/types" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +// Server is the scheduling server, and it implements bs.Server. +type Server struct { + diagnosticspb.DiagnosticsServer + // Server state. 0 is not running, 1 is running. + isRunning int64 + // Server start timestamp + startTimestamp int64 + + ctx context.Context + serverLoopCtx context.Context + serverLoopCancel func() + serverLoopWg sync.WaitGroup + + cfg *Config + clusterID uint64 + name string + listenURL *url.URL + + // for the primary election of scheduling + participant *member.Participant + etcdClient *clientv3.Client + httpClient *http.Client + + muxListener net.Listener + service *Service + + // Callback functions for different stages + // startCallbacks will be called after the server is started. + startCallbacks []func() + // primaryCallbacks will be called after the server becomes leader. + primaryCallbacks []func(context.Context) + + serviceRegister *discovery.ServiceRegister +} + +// Name returns the unique etcd name for this server in etcd cluster. +func (s *Server) Name() string { + return s.name +} + +// Context returns the context. +func (s *Server) Context() context.Context { + return s.ctx +} + +// GetAddr returns the server address. +func (s *Server) GetAddr() string { + return s.cfg.ListenAddr +} + +// Run runs the Scheduling server. +func (s *Server) Run() (err error) { + if err = s.initClient(); err != nil { + return err + } + if err = s.startServer(); err != nil { + return err + } + + s.startServerLoop() + + return nil +} + +func (s *Server) startServerLoop() { + s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) + s.serverLoopWg.Add(1) + go s.primaryElectionLoop() +} + +func (s *Server) primaryElectionLoop() { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + for { + if s.IsClosed() { + log.Info("server is closed, exit scheduling primary election loop") + return + } + + primary, checkAgain := s.participant.CheckLeader() + if checkAgain { + continue + } + if primary != nil { + log.Info("start to watch the primary", zap.Stringer("scheduling-primary", primary)) + // Watch will keep looping and never return unless the primary/leader has changed. + primary.Watch(s.serverLoopCtx) + log.Info("the scheduling primary has changed, try to re-campaign a primary") + } + + s.campaignLeader() + } +} + +func (s *Server) campaignLeader() { + log.Info("start to campaign the primary/leader", zap.String("campaign-scheduling-primary-name", s.participant.Name())) + if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil { + if err.Error() == errs.ErrEtcdTxnConflict.Error() { + log.Info("campaign scheduling primary meets error due to txn conflict, another server may campaign successfully", + zap.String("campaign-scheduling-primary-name", s.participant.Name())) + } else { + log.Error("campaign scheduling primary meets error due to etcd error", + zap.String("campaign-scheduling-primary-name", s.participant.Name()), + errs.ZapError(err)) + } + return + } + + // Start keepalive the leadership and enable Scheduling service. + ctx, cancel := context.WithCancel(s.serverLoopCtx) + var resetLeaderOnce sync.Once + defer resetLeaderOnce.Do(func() { + cancel() + s.participant.ResetLeader() + }) + + // maintain the leadership, after this, Scheduling could be ready to provide service. + s.participant.KeepLeader(ctx) + log.Info("campaign scheduling primary ok", zap.String("campaign-scheduling-primary-name", s.participant.Name())) + + log.Info("triggering the primary callback functions") + for _, cb := range s.primaryCallbacks { + cb(ctx) + } + + s.participant.EnableLeader() + log.Info("scheduling primary is ready to serve", zap.String("scheduling-primary-name", s.participant.Name())) + + leaderTicker := time.NewTicker(utils.LeaderTickInterval) + defer leaderTicker.Stop() + + for { + select { + case <-leaderTicker.C: + if !s.participant.IsLeader() { + log.Info("no longer a primary/leader because lease has expired, the scheduling primary/leader will step down") + return + } + case <-ctx.Done(): + // Server is closed and it should return nil. + log.Info("server is closed") + return + } + } +} + +// Close closes the server. +func (s *Server) Close() { + if !atomic.CompareAndSwapInt64(&s.isRunning, 1, 0) { + // server is already closed + return + } + + log.Info("closing scheduling server ...") + s.serviceRegister.Deregister() + s.muxListener.Close() + s.serverLoopCancel() + s.serverLoopWg.Wait() + + if s.etcdClient != nil { + if err := s.etcdClient.Close(); err != nil { + log.Error("close etcd client meet error", errs.ZapError(errs.ErrCloseEtcdClient, err)) + } + } + + if s.httpClient != nil { + s.httpClient.CloseIdleConnections() + } + + log.Info("scheduling server is closed") +} + +// GetClient returns builtin etcd client. +func (s *Server) GetClient() *clientv3.Client { + return s.etcdClient +} + +// GetHTTPClient returns builtin http client. +func (s *Server) GetHTTPClient() *http.Client { + return s.httpClient +} + +// AddStartCallback adds a callback in the startServer phase. +func (s *Server) AddStartCallback(callbacks ...func()) { + s.startCallbacks = append(s.startCallbacks, callbacks...) +} + +// IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise. +func (s *Server) IsServing() bool { + return !s.IsClosed() && s.participant.IsLeader() +} + +// IsClosed checks if the server loop is closed +func (s *Server) IsClosed() bool { + return s != nil && atomic.LoadInt64(&s.isRunning) == 0 +} + +// AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise. +func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { + s.primaryCallbacks = append(s.primaryCallbacks, callbacks...) +} + +func (s *Server) initClient() error { + tlsConfig, err := s.cfg.Security.ToTLSConfig() + if err != nil { + return err + } + u, err := types.NewURLs(strings.Split(s.cfg.BackendEndpoints, ",")) + if err != nil { + return err + } + s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u)[0]) + return err +} + +func (s *Server) startGRPCServer(l net.Listener) { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + gs := grpc.NewServer() + s.service.RegisterGRPCService(gs) + err := gs.Serve(l) + log.Info("gRPC server stop serving") + + // Attempt graceful stop (waits for pending RPCs), but force a stop if + // it doesn't happen in a reasonable amount of time. + done := make(chan struct{}) + go func() { + defer logutil.LogPanic() + log.Info("try to gracefully stop the server now") + gs.GracefulStop() + close(done) + }() + timer := time.NewTimer(utils.DefaultGRPCGracefulStopTimeout) + defer timer.Stop() + select { + case <-done: + case <-timer.C: + log.Info("stopping grpc gracefully is taking longer than expected and force stopping now", zap.Duration("default", utils.DefaultGRPCGracefulStopTimeout)) + gs.Stop() + } + if s.IsClosed() { + log.Info("grpc server stopped") + } else { + log.Fatal("grpc server stopped unexpectedly", errs.ZapError(err)) + } +} + +func (s *Server) startHTTPServer(l net.Listener) { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + handler, _ := SetUpRestHandler(s.service) + hs := &http.Server{ + Handler: handler, + ReadTimeout: 5 * time.Minute, + ReadHeaderTimeout: 5 * time.Second, + } + err := hs.Serve(l) + log.Info("http server stop serving") + + ctx, cancel := context.WithTimeout(context.Background(), utils.DefaultHTTPGracefulShutdownTimeout) + defer cancel() + if err := hs.Shutdown(ctx); err != nil { + log.Error("http server shutdown encountered problem", errs.ZapError(err)) + } else { + log.Info("all http(s) requests finished") + } + if s.IsClosed() { + log.Info("http server stopped") + } else { + log.Fatal("http server stopped unexpectedly", errs.ZapError(err)) + } +} + +func (s *Server) startGRPCAndHTTPServers(l net.Listener) { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + mux := cmux.New(l) + grpcL := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc")) + httpL := mux.Match(cmux.Any()) + + s.serverLoopWg.Add(2) + go s.startGRPCServer(grpcL) + go s.startHTTPServer(httpL) + + if err := mux.Serve(); err != nil { + if s.IsClosed() { + log.Info("mux stop serving", errs.ZapError(err)) + } else { + log.Fatal("mux stop serving unexpectedly", errs.ZapError(err)) + } + } +} + +// GetLeaderListenUrls gets service endpoints from the leader in election group. +func (s *Server) GetLeaderListenUrls() []string { + return s.participant.GetLeaderListenUrls() +} + +func (s *Server) startServer() (err error) { + if s.clusterID, err = utils.InitClusterID(s.ctx, s.etcdClient); err != nil { + return err + } + log.Info("init cluster id", zap.Uint64("cluster-id", s.clusterID)) + // The independent Scheduling service still reuses PD version info since PD and Scheduling are just + // different service modes provided by the same pd-server binary + serverInfo.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix())) + + uniqueName := s.cfg.ListenAddr + uniqueID := memberutil.GenerateUniqueID(uniqueName) + log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID)) + schedulingPrimaryPrefix := endpoint.SchedulingSvcRootPath(s.clusterID) + s.participant = member.NewParticipant(s.etcdClient) + s.participant.InitInfo(uniqueName, uniqueID, path.Join(schedulingPrimaryPrefix, fmt.Sprintf("%05d", 0)), + utils.KeyspaceGroupsPrimaryKey, "keyspace group primary election", s.cfg.AdvertiseListenAddr) + + tlsConfig, err := s.cfg.Security.ToTLSConfig() + if err != nil { + return err + } + s.listenURL, err = url.Parse(s.cfg.ListenAddr) + if err != nil { + return err + } + if tlsConfig != nil { + s.muxListener, err = tls.Listen(utils.TCPNetworkStr, s.listenURL.Host, tlsConfig) + } else { + s.muxListener, err = net.Listen(utils.TCPNetworkStr, s.listenURL.Host) + } + if err != nil { + return err + } + + s.serverLoopWg.Add(1) + go s.startGRPCAndHTTPServers(s.muxListener) + + // Run callbacks + log.Info("triggering the start callback functions") + for _, cb := range s.startCallbacks { + cb() + } + + // Server has started. + entry := &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr} + serializedEntry, err := entry.Serialize() + if err != nil { + return err + } + s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, strconv.FormatUint(s.clusterID, 10), + utils.SchedulingServiceName, s.cfg.AdvertiseListenAddr, serializedEntry, discovery.DefaultLeaseInSeconds) + if err := s.serviceRegister.Register(); err != nil { + log.Error("failed to register the service", zap.String("service-name", utils.SchedulingServiceName), errs.ZapError(err)) + return err + } + atomic.StoreInt64(&s.isRunning, 1) + return nil +} + +// CreateServer creates the Server +func CreateServer(ctx context.Context, cfg *Config) *Server { + svr := &Server{ + DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename), + startTimestamp: time.Now().Unix(), + cfg: cfg, + ctx: ctx, + } + return svr +} + +// CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server +func CreateServerWrapper(cmd *cobra.Command, args []string) { + cmd.Flags().Parse(args) + cfg := NewConfig() + flagSet := cmd.Flags() + err := cfg.Parse(flagSet) + defer logutil.LogPanic() + + if err != nil { + cmd.Println(err) + return + } + + if printVersion, err := flagSet.GetBool("version"); err != nil { + cmd.Println(err) + return + } else if printVersion { + versioninfo.Print() + exit(0) + } + + // New zap logger + err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog) + if err == nil { + log.ReplaceGlobals(cfg.Logger, cfg.LogProps) + } else { + log.Fatal("initialize logger error", errs.ZapError(err)) + } + // Flushing any buffered log entries + defer log.Sync() + + versioninfo.Log("Scheduling") + log.Info("Scheduling config", zap.Reflect("config", cfg)) + + grpcprometheus.EnableHandlingTimeHistogram() + metricutil.Push(&cfg.Metric) + + ctx, cancel := context.WithCancel(context.Background()) + svr := CreateServer(ctx, cfg) + + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + var sig os.Signal + go func() { + sig = <-sc + cancel() + }() + + if err := svr.Run(); err != nil { + log.Fatal("run server failed", errs.ZapError(err)) + } + + <-ctx.Done() + log.Info("got signal to exit", zap.String("signal", sig.String())) + + svr.Close() + switch sig { + case syscall.SIGTERM: + exit(0) + default: + exit(1) + } +} + +func exit(code int) { + log.Sync() + os.Exit(code) +} diff --git a/pkg/mcs/utils/constant.go b/pkg/mcs/utils/constant.go index 26c0f369819..259b7dc166f 100644 --- a/pkg/mcs/utils/constant.go +++ b/pkg/mcs/utils/constant.go @@ -57,6 +57,8 @@ const ( TSOServiceName = "tso" // ResourceManagerServiceName is the name of resource manager server. ResourceManagerServiceName = "resource_manager" + // SchedulingServiceName is the name of scheduling server. + SchedulingServiceName = "scheduling" // KeyspaceGroupsKey is the path component of keyspace groups. KeyspaceGroupsKey = "keyspace_groups" // KeyspaceGroupsPrimaryKey is the path component of primary for keyspace groups. diff --git a/pkg/storage/endpoint/key_path.go b/pkg/storage/endpoint/key_path.go index 351c7885895..4f93a3c5d74 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/storage/endpoint/key_path.go @@ -253,6 +253,12 @@ func ResourceManagerSvcRootPath(clusterID uint64) string { return svcRootPath(clusterID, utils.ResourceManagerServiceName) } +// SchedulingSvcRootPath returns the root path of scheduling service. +// Path: /ms/{cluster_id}/scheduling +func SchedulingSvcRootPath(clusterID uint64) string { + return svcRootPath(clusterID, utils.SchedulingServiceName) +} + // TSOSvcRootPath returns the root path of tso service. // Path: /ms/{cluster_id}/tso func TSOSvcRootPath(clusterID uint64) string { diff --git a/pkg/swaggerserver/swaggerserver.go b/pkg/swaggerserver/swaggerserver.go index 778844adbde..d68bab06eb2 100644 --- a/pkg/swaggerserver/swaggerserver.go +++ b/pkg/swaggerserver/swaggerserver.go @@ -12,12 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build swagger_server +// +build swagger_server + package swaggerserver import ( "context" "net/http" + httpSwagger "github.com/swaggo/http-swagger" + _ "github.com/tikv/pd/docs/swagger" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/server" ) @@ -33,9 +38,14 @@ var ( } ) +// Enabled return true if swagger server is disabled. +func Enabled() bool { + return true +} + // NewHandler creates a HTTP handler for Swagger. func NewHandler(context.Context, *server.Server) (http.Handler, apiutil.APIServiceGroup, error) { swaggerHandler := http.NewServeMux() - swaggerHandler.Handle(swaggerPrefix, handler()) + swaggerHandler.Handle(swaggerPrefix, httpSwagger.Handler()) return swaggerHandler, swaggerServiceGroup, nil } diff --git a/pkg/swaggerserver/empty_handler.go b/pkg/swaggerserver/swaggerserver_disable.go similarity index 61% rename from pkg/swaggerserver/empty_handler.go rename to pkg/swaggerserver/swaggerserver_disable.go index 79f33a9af6b..c3b861b3b6c 100644 --- a/pkg/swaggerserver/empty_handler.go +++ b/pkg/swaggerserver/swaggerserver_disable.go @@ -1,4 +1,4 @@ -// Copyright 2020 TiKV Project Authors. +// Copyright 2023 TiKV Project Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -18,12 +18,19 @@ package swaggerserver import ( - "io" + "context" "net/http" + + "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/tikv/pd/server" ) -func handler() http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - _, _ = io.WriteString(w, "Swagger UI is not built. Try `make` with `SWAGGER=1`.\n") - }) +// Enabled return false if swagger server is disabled. +func Enabled() bool { + return false +} + +// NewHandler creates a HTTP handler for Swagger. +func NewHandler(context.Context, *server.Server) (http.Handler, apiutil.APIServiceGroup, error) { + return nil, apiutil.APIServiceGroup{}, nil } diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 600c46348a4..2ffa802b4f3 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -151,7 +151,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() { tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: suite.cfg.AdvertiseListenAddr} guid := uuid.New().String() - tsoServiceKey := discovery.ServicePath(guid, "tso") + "/" + tsoServiceKey := discovery.ServicePath(guid, "tso") legacySvcRootPath := path.Join("/pd", guid) tsoSvcRootPath := path.Join(mcsutils.MicroserviceRootPath, guid, "tso") electionNamePrefix := "tso-server-" + guid @@ -817,7 +817,7 @@ func (suite *keyspaceGroupManagerTestSuite) newKeyspaceGroupManager( cfg *TestServiceConfig, ) *KeyspaceGroupManager { tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: cfg.GetAdvertiseListenAddr()} - tsoServiceKey := discovery.ServicePath(uniqueStr, "tso") + "/" + tsoServiceKey := discovery.ServicePath(uniqueStr, "tso") legacySvcRootPath := path.Join("/pd", uniqueStr) tsoSvcRootPath := path.Join(mcsutils.MicroserviceRootPath, uniqueStr, "tso") electionNamePrefix := "kgm-test-" + cfg.GetAdvertiseListenAddr() diff --git a/server/api/hot_status.go b/server/api/hot_status.go index 0296155596b..749a371300a 100644 --- a/server/api/hot_status.go +++ b/server/api/hot_status.go @@ -210,7 +210,7 @@ func (h *hotStatusHandler) GetHotBuckets(w http.ResponseWriter, r *http.Request) ids[i] = id } } - stats := h.Handler.GetHotBuckets() + stats := h.Handler.GetHotBuckets(ids...) ret := HotBucketsResponse{} for regionID, stats := range stats { ret[regionID] = make([]*HotBucketsItem, len(stats)) diff --git a/server/apiv2/handlers/keyspace.go b/server/apiv2/handlers/keyspace.go index 2568a3c744d..b93dc84faf8 100644 --- a/server/apiv2/handlers/keyspace.go +++ b/server/apiv2/handlers/keyspace.go @@ -110,6 +110,20 @@ func LoadKeyspace(c *gin.Context) { c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) return } + if value, ok := c.GetQuery("force_refresh_group_id"); ok && value == "true" { + groupManager := svr.GetKeyspaceGroupManager() + if groupManager == nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, managerUninitializedErr) + return + } + // keyspace has been checked in LoadKeyspace, so no need to check again. + groupID, err := groupManager.GetGroupByKeyspaceID(meta.GetId()) + if err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) + return + } + meta.Config[keyspace.TSOKeyspaceGroupIDKey] = strconv.FormatUint(uint64(groupID), 10) + } c.IndentedJSON(http.StatusOK, &KeyspaceMeta{meta}) } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 143c7ceb5e4..84dfd3cecfe 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -606,6 +606,7 @@ func (c *RaftCluster) runUpdateStoreStats() { } } +// runCoordinator runs the main scheduling loop. func (c *RaftCluster) runCoordinator() { defer logutil.LogPanic() defer c.wg.Done() @@ -892,7 +893,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() // c.limiter is nil before "start" is called - if c.limiter != nil && c.opt.GetStoreLimitMode() == "auto" { + if c.limiter != nil { c.limiter.Collect(newStore.GetStoreStats()) } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 55713a9e47b..c2ff966f228 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -1348,7 +1348,6 @@ func TestSyncConfig(t *testing.T) { if v.updated { re.True(switchRaftV2) tc.opt.UseRaftV2() - re.EqualValues(0, tc.opt.GetMaxMergeRegionSize()) re.EqualValues(512, tc.opt.GetMaxMovableHotPeerSize()) success, switchRaftV2 = syncConfig(tc.storeConfigManager, tc.GetStores()) re.True(success) diff --git a/server/config/config.go b/server/config/config.go index 31777cefa21..5bda5bdc9f9 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -666,14 +666,6 @@ type ScheduleConfig struct { // Only used to display SchedulersPayload map[string]interface{} `toml:"schedulers-payload" json:"schedulers-payload"` - // StoreLimitMode can be auto or manual, when set to auto, - // PD tries to change the store limit values according to - // the load state of the cluster dynamically. User can - // overwrite the auto-tuned value by pd-ctl, when the value - // is overwritten, the value is fixed until it is deleted. - // Default: manual - StoreLimitMode string `toml:"store-limit-mode" json:"store-limit-mode"` - // Controls the time interval between write hot regions info into leveldb. HotRegionsWriteInterval typeutil.Duration `toml:"hot-regions-write-interval" json:"hot-regions-write-interval"` @@ -746,7 +738,6 @@ const ( defaultHotRegionCacheHitsThreshold = 3 defaultSchedulerMaxWaitingOperator = 5 defaultLeaderSchedulePolicy = "count" - defaultStoreLimitMode = "manual" defaultEnableJointConsensus = true defaultEnableTiKVSplitRegion = true defaultEnableCrossTableMerge = true @@ -806,10 +797,6 @@ func (c *ScheduleConfig) adjust(meta *configutil.ConfigMetaData, reloading bool) if !meta.IsDefined("leader-schedule-policy") { configutil.AdjustString(&c.LeaderSchedulePolicy, defaultLeaderSchedulePolicy) } - if !meta.IsDefined("store-limit-mode") { - configutil.AdjustString(&c.StoreLimitMode, defaultStoreLimitMode) - } - if !meta.IsDefined("store-limit-version") { configutil.AdjustString(&c.StoreLimitVersion, defaultStoreLimitVersion) } diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 81484e12607..d308f6338a2 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -202,13 +202,7 @@ func (o *PersistOptions) SetMaxReplicas(replicas int) { } // UseRaftV2 set some config for raft store v2 by default temporary. -// todo: remove this after raft store support this. -// disable merge check -func (o *PersistOptions) UseRaftV2() { - v := o.GetScheduleConfig().Clone() - v.MaxMergeRegionSize = 0 - o.SetScheduleConfig(v) -} +func (o *PersistOptions) UseRaftV2() {} const ( maxSnapshotCountKey = "schedule.max-snapshot-count" @@ -507,11 +501,6 @@ func (o *PersistOptions) GetAllStoresLimit() map[uint64]StoreLimitConfig { return o.GetScheduleConfig().StoreLimit } -// GetStoreLimitMode returns the limit mode of store. -func (o *PersistOptions) GetStoreLimitMode() string { - return o.GetScheduleConfig().StoreLimitMode -} - // GetStoreLimitVersion returns the limit version of store. func (o *PersistOptions) GetStoreLimitVersion() string { return o.GetScheduleConfig().StoreLimitVersion diff --git a/server/keyspace_service.go b/server/keyspace_service.go index c5fa7377178..287e4d3fe96 100644 --- a/server/keyspace_service.go +++ b/server/keyspace_service.go @@ -141,3 +141,21 @@ func (s *KeyspaceServer) UpdateKeyspaceState(_ context.Context, request *keyspac Keyspace: meta, }, nil } + +// GetAllKeyspaces get all keyspace's metadata. +func (s *KeyspaceServer) GetAllKeyspaces(_ context.Context, request *keyspacepb.GetAllKeyspacesRequest) (*keyspacepb.GetAllKeyspacesResponse, error) { + if err := s.validateRequest(request.GetHeader()); err != nil { + return nil, err + } + + manager := s.GetKeyspaceManager() + keyspaces, err := manager.LoadRangeKeyspace(request.StartId, int(request.Limit)) + if err != nil { + return &keyspacepb.GetAllKeyspacesResponse{Header: s.getErrorHeader(err)}, nil + } + + return &keyspacepb.GetAllKeyspacesResponse{ + Header: s.header(), + Keyspaces: keyspaces, + }, nil +} diff --git a/tests/cluster.go b/tests/cluster.go index bfdf7acf80d..28506858f0c 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -97,7 +97,10 @@ func createTestServer(ctx context.Context, cfg *config.Config, services []string if err != nil { return nil, err } - serviceBuilders := []server.HandlerBuilder{api.NewHandler, apiv2.NewV2Handler, swaggerserver.NewHandler, autoscaling.NewHandler} + serviceBuilders := []server.HandlerBuilder{api.NewHandler, apiv2.NewV2Handler, autoscaling.NewHandler} + if swaggerserver.Enabled() { + serviceBuilders = append(serviceBuilders, swaggerserver.NewHandler) + } serviceBuilders = append(serviceBuilders, dashboard.GetServiceBuilders()...) svr, err := server.CreateServer(ctx, cfg, services, serviceBuilders...) if err != nil { @@ -628,9 +631,11 @@ func (c *TestCluster) WaitLeader(ops ...WaitOption) string { counter := make(map[string]int) running := 0 for _, s := range c.servers { + s.RLock() if s.state == Running { running++ } + s.RUnlock() n := s.GetLeader().GetName() if n != "" { counter[n]++ diff --git a/tests/integrations/client/Makefile b/tests/integrations/client/Makefile index 71f6297270c..2d0cf748599 100644 --- a/tests/integrations/client/Makefile +++ b/tests/integrations/client/Makefile @@ -18,9 +18,12 @@ PATH := $(GO_TOOLS_BIN_PATH):$(PATH) SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash) static: install-tools + @ echo "gofmt ..." @ gofmt -s -l -d . 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }' - @ golangci-lint run ./... - @ revive -formatter friendly -config $(ROOT_PATH)/revive.toml . + @ echo "golangci-lint ..." + @ golangci-lint run -c $(ROOT_PATH)/.golangci.yml --verbose ./... + @ echo "revive ..." + @ revive -formatter friendly -config $(ROOT_PATH)/revive.toml ./... tidy: @ go mod tidy diff --git a/tests/integrations/client/go.mod b/tests/integrations/client/go.mod index a5ebdaa3a8f..b7b3b96d034 100644 --- a/tests/integrations/client/go.mod +++ b/tests/integrations/client/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/docker/go-units v0.4.0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2 + github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/stretchr/testify v1.8.2 github.com/tikv/pd v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/client/go.sum b/tests/integrations/client/go.sum index e4566aa4c6c..56b1aab028f 100644 --- a/tests/integrations/client/go.sum +++ b/tests/integrations/client/go.sum @@ -387,8 +387,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2 h1:VM6INL8StTPYMKufyHRX2hPUMP7isHnkYvtRMA7Sdsc= -github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1 h1:sC3XRNNBQNjFJGRtSzJRvqi2aDLFOsQoCHItr9rbbY8= +github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= diff --git a/tests/integrations/client/keyspace_test.go b/tests/integrations/client/keyspace_test.go index 28a9ceed116..df396808d8e 100644 --- a/tests/integrations/client/keyspace_test.go +++ b/tests/integrations/client/keyspace_test.go @@ -16,6 +16,7 @@ package client_test import ( "fmt" + "math" "time" "github.com/pingcap/kvproto/pkg/keyspacepb" @@ -27,16 +28,17 @@ import ( ) const ( - testConfig1 = "config_entry_1" - testConfig2 = "config_entry_2" + testConfig1 = "config_entry_1" + testConfig2 = "config_entry_2" + testKeyspaceCount = 10 ) -func mustMakeTestKeyspaces(re *require.Assertions, server *server.Server, start, count int) []*keyspacepb.KeyspaceMeta { +func mustMakeTestKeyspaces(re *require.Assertions, server *server.Server, start int) []*keyspacepb.KeyspaceMeta { now := time.Now().Unix() var err error - keyspaces := make([]*keyspacepb.KeyspaceMeta, count) + keyspaces := make([]*keyspacepb.KeyspaceMeta, testKeyspaceCount) manager := server.GetKeyspaceManager() - for i := 0; i < count; i++ { + for i := 0; i < testKeyspaceCount; i++ { keyspaces[i], err = manager.CreateKeyspace(&keyspace.CreateKeyspaceRequest{ Name: fmt.Sprintf("test_keyspace_%d", start+i), Config: map[string]string{ @@ -53,7 +55,7 @@ func mustMakeTestKeyspaces(re *require.Assertions, server *server.Server, start, func (suite *clientTestSuite) TestLoadKeyspace() { re := suite.Require() - metas := mustMakeTestKeyspaces(re, suite.srv, 0, 10) + metas := mustMakeTestKeyspaces(re, suite.srv, 0) for _, expected := range metas { loaded, err := suite.client.LoadKeyspace(suite.ctx, expected.GetName()) re.NoError(err) @@ -69,54 +71,31 @@ func (suite *clientTestSuite) TestLoadKeyspace() { re.Equal(utils.DefaultKeyspaceName, keyspaceDefault.GetName()) } -func (suite *clientTestSuite) TestWatchKeyspaces() { +func (suite *clientTestSuite) TestGetAllKeyspaces() { re := suite.Require() - initialKeyspaces := mustMakeTestKeyspaces(re, suite.srv, 10, 10) - watchChan, err := suite.client.WatchKeyspaces(suite.ctx) - re.NoError(err) - // First batch of watchChan message should contain all existing keyspaces. - initialLoaded := <-watchChan - for i := range initialKeyspaces { - re.Contains(initialLoaded, initialKeyspaces[i]) + metas := mustMakeTestKeyspaces(re, suite.srv, 20) + for _, expected := range metas { + loaded, err := suite.client.LoadKeyspace(suite.ctx, expected.GetName()) + re.NoError(err) + re.Equal(expected, loaded) } - // Each additional message contains extra put events. - additionalKeyspaces := mustMakeTestKeyspaces(re, suite.srv, 30, 10) + // Get all keyspaces. + resKeyspaces, err := suite.client.GetAllKeyspaces(suite.ctx, 1, math.MaxUint32) re.NoError(err) - // Checks that all additional keyspaces are captured by watch channel. - for i := 0; i < 10; { - loadedKeyspaces := <-watchChan - re.NotEmpty(loadedKeyspaces) - for j := range loadedKeyspaces { - re.Equal(additionalKeyspaces[i+j], loadedKeyspaces[j]) + re.Equal(len(metas), len(resKeyspaces)) + // Check expected keyspaces all in resKeyspaces. + for _, expected := range metas { + var isExists bool + for _, resKeyspace := range resKeyspaces { + if expected.GetName() == resKeyspace.GetName() { + isExists = true + continue + } + } + if !isExists { + re.Fail("not exists keyspace") } - i += len(loadedKeyspaces) } - // Updates to state should also be captured. - expected, err := suite.srv.GetKeyspaceManager().UpdateKeyspaceState(initialKeyspaces[0].Name, keyspacepb.KeyspaceState_DISABLED, time.Now().Unix()) - re.NoError(err) - loaded := <-watchChan - re.Equal([]*keyspacepb.KeyspaceMeta{expected}, loaded) - // Updates to config should also be captured. - expected, err = suite.srv.GetKeyspaceManager().UpdateKeyspaceConfig(initialKeyspaces[0].Name, []*keyspace.Mutation{ - { - Op: keyspace.OpDel, - Key: testConfig1, - }, - }) - re.NoError(err) - loaded = <-watchChan - re.Equal([]*keyspacepb.KeyspaceMeta{expected}, loaded) - // Updates to default keyspace's config should also be captured. - expected, err = suite.srv.GetKeyspaceManager().UpdateKeyspaceConfig(utils.DefaultKeyspaceName, []*keyspace.Mutation{ - { - Op: keyspace.OpPut, - Key: "config", - Value: "value", - }, - }) - re.NoError(err) - loaded = <-watchChan - re.Equal([]*keyspacepb.KeyspaceMeta{expected}, loaded) } func mustCreateKeyspaceAtState(re *require.Assertions, server *server.Server, index int, state keyspacepb.KeyspaceState) *keyspacepb.KeyspaceMeta { diff --git a/tests/integrations/mcs/Makefile b/tests/integrations/mcs/Makefile index 2628ea9e437..11862fc9e6c 100644 --- a/tests/integrations/mcs/Makefile +++ b/tests/integrations/mcs/Makefile @@ -18,9 +18,12 @@ PATH := $(GO_TOOLS_BIN_PATH):$(PATH) SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash) static: install-tools + @ echo "gofmt ..." @ gofmt -s -l -d . 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }' - @ golangci-lint run ./... - @ revive -formatter friendly -config $(ROOT_PATH)/revive.toml . + @ echo "golangci-lint ..." + @ golangci-lint run -c $(ROOT_PATH)/.golangci.yml --verbose ./... + @ echo "revive ..." + @ revive -formatter friendly -config $(ROOT_PATH)/revive.toml ./... tidy: @ go mod tidy diff --git a/tests/integrations/mcs/go.mod b/tests/integrations/mcs/go.mod index 9ae99f0aba6..2c9cf02b3e5 100644 --- a/tests/integrations/mcs/go.mod +++ b/tests/integrations/mcs/go.mod @@ -12,7 +12,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2 + github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/stretchr/testify v1.8.2 github.com/tikv/pd v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/mcs/go.sum b/tests/integrations/mcs/go.sum index 7b876f7a63e..312a3d5548c 100644 --- a/tests/integrations/mcs/go.sum +++ b/tests/integrations/mcs/go.sum @@ -388,8 +388,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2 h1:VM6INL8StTPYMKufyHRX2hPUMP7isHnkYvtRMA7Sdsc= -github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1 h1:sC3XRNNBQNjFJGRtSzJRvqi2aDLFOsQoCHItr9rbbY8= +github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index 064d0bfe80e..d393dfe03ad 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -776,6 +776,22 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { } }, }, + {"default", rmpb.GroupMode_RUMode, false, true, + `{"name":"default","mode":1,"r_u_settings":{"r_u":{"settings":{"fill_rate":10000,"burst_limit":-1},"state":{"initialized":false}}},"priority":0,"background_settings":{"job_types":["br"]}}`, + func(gs *rmpb.ResourceGroup) { + gs.RUSettings = &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: 10000, + BurstLimit: -1, + }, + }, + } + gs.BackgroundSettings = &rmpb.BackgroundSettings{ + JobTypes: []string{"br"}, + } + }, + }, } checkErr := func(err error, success bool) { @@ -940,6 +956,25 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { re.Equal(1, len(groups1)) } } + + // test restart cluster + groups, err := cli.ListResourceGroups(suite.ctx) + re.NoError(err) + servers := suite.cluster.GetServers() + re.NoError(suite.cluster.StopAll()) + serverList := make([]*tests.TestServer, 0, len(servers)) + for _, s := range servers { + serverList = append(serverList, s) + } + re.NoError(suite.cluster.RunServers(serverList)) + suite.cluster.WaitLeader() + var newGroups []*rmpb.ResourceGroup + testutil.Eventually(suite.Require(), func() bool { + var err error + newGroups, err = cli.ListResourceGroups(suite.ctx) + return err == nil + }, testutil.WithWaitFor(time.Second)) + re.Equal(groups, newGroups) } func (suite *resourceManagerClientTestSuite) TestResourceManagerClientFailover() { @@ -1046,7 +1081,7 @@ func (suite *resourceManagerClientTestSuite) TestLoadRequestUnitConfig() { re.NoError(err) config := ctr.GetConfig() re.NotNil(config) - expectedConfig := controller.DefaultConfig() + expectedConfig := controller.DefaultRUConfig() re.Equal(expectedConfig.ReadBaseCost, config.ReadBaseCost) re.Equal(expectedConfig.ReadBytesCost, config.ReadBytesCost) re.Equal(expectedConfig.WriteBaseCost, config.WriteBaseCost) @@ -1064,9 +1099,9 @@ func (suite *resourceManagerClientTestSuite) TestLoadRequestUnitConfig() { re.NoError(err) config = ctr.GetConfig() re.NotNil(config) - controllerConfig := controller.DefaultControllerConfig() + controllerConfig := controller.DefaultConfig() controllerConfig.RequestUnit = *ruConfig - expectedConfig = controller.GenerateConfig(controllerConfig) + expectedConfig = controller.GenerateRUConfig(controllerConfig) re.Equal(expectedConfig.ReadBaseCost, config.ReadBaseCost) re.Equal(expectedConfig.ReadBytesCost, config.ReadBytesCost) re.Equal(expectedConfig.WriteBaseCost, config.WriteBaseCost) diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index b7c3873527e..ec9322cd1ec 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -427,8 +427,6 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) dispatchClient( for { select { case <-ctx.Done(): - // Make sure at least one TSO request is successful. - re.NotEmpty(lastPhysical) return default: } diff --git a/tests/integrations/mcs/tso/proxy_test.go b/tests/integrations/mcs/tso/proxy_test.go index 45646dfa48e..625a702ad39 100644 --- a/tests/integrations/mcs/tso/proxy_test.go +++ b/tests/integrations/mcs/tso/proxy_test.go @@ -79,7 +79,7 @@ func (s *tsoProxyTestSuite) SetupSuite() { } // Create some TSO client streams with different context. - s.streams, s.cleanupFuncs = createTSOStreams(re, s.ctx, s.backendEndpoints, 200) + s.streams, s.cleanupFuncs = createTSOStreams(s.ctx, re, s.backendEndpoints, 200) } func (s *tsoProxyTestSuite) TearDownSuite() { @@ -107,7 +107,7 @@ func (s *tsoProxyTestSuite) TestTSOProxyWorksWithCancellation() { go func() { defer wg.Done() for i := 0; i < 3; i++ { - streams, cleanupFuncs := createTSOStreams(re, s.ctx, s.backendEndpoints, 10) + streams, cleanupFuncs := createTSOStreams(s.ctx, re, s.backendEndpoints, 10) for j := 0; j < 10; j++ { s.verifyTSOProxy(s.ctx, streams, cleanupFuncs, 10, true) } @@ -148,7 +148,7 @@ func TestTSOProxyStress(t *testing.T) { log.Info("start a new round of stress test", zap.Int("round-id", i), zap.Int("clients-count", len(streams)+clientsIncr)) streamsTemp, cleanupFuncsTemp := - createTSOStreams(re, s.ctx, s.backendEndpoints, clientsIncr) + createTSOStreams(s.ctx, re, s.backendEndpoints, clientsIncr) streams = append(streams, streamsTemp...) cleanupFuncs = append(cleanupFuncs, cleanupFuncsTemp...) s.verifyTSOProxy(ctxTimeout, streams, cleanupFuncs, 50, false) @@ -201,7 +201,7 @@ func (s *tsoProxyTestSuite) TestTSOProxyRecvFromClientTimeout() { // Enable the failpoint to make the TSO Proxy's grpc stream timeout on the server side to be 1 second. re.NoError(failpoint.Enable("github.com/tikv/pd/server/tsoProxyRecvFromClientTimeout", `return(1)`)) - streams, cleanupFuncs := createTSOStreams(re, s.ctx, s.backendEndpoints, 1) + streams, cleanupFuncs := createTSOStreams(s.ctx, re, s.backendEndpoints, 1) // Sleep 2 seconds to make the TSO Proxy's grpc stream timeout on the server side. time.Sleep(2 * time.Second) err := streams[0].Send(s.defaultReq) @@ -220,7 +220,7 @@ func (s *tsoProxyTestSuite) TestTSOProxyFailToSendToClient() { // Enable the failpoint to make the TSO Proxy's grpc stream timeout on the server side to be 1 second. re.NoError(failpoint.Enable("github.com/tikv/pd/server/tsoProxyFailToSendToClient", `return(true)`)) - streams, cleanupFuncs := createTSOStreams(re, s.ctx, s.backendEndpoints, 1) + streams, cleanupFuncs := createTSOStreams(s.ctx, re, s.backendEndpoints, 1) err := streams[0].Send(s.defaultReq) re.NoError(err) _, err = streams[0].Recv() @@ -238,7 +238,7 @@ func (s *tsoProxyTestSuite) TestTSOProxySendToTSOTimeout() { // Enable the failpoint to make the TSO Proxy's grpc stream timeout on the server side to be 1 second. re.NoError(failpoint.Enable("github.com/tikv/pd/server/tsoProxySendToTSOTimeout", `return(true)`)) - streams, cleanupFuncs := createTSOStreams(re, s.ctx, s.backendEndpoints, 1) + streams, cleanupFuncs := createTSOStreams(s.ctx, re, s.backendEndpoints, 1) err := streams[0].Send(s.defaultReq) re.NoError(err) _, err = streams[0].Recv() @@ -256,7 +256,7 @@ func (s *tsoProxyTestSuite) TestTSOProxyRecvFromTSOTimeout() { // Enable the failpoint to make the TSO Proxy's grpc stream timeout on the server side to be 1 second. re.NoError(failpoint.Enable("github.com/tikv/pd/server/tsoProxyRecvFromTSOTimeout", `return(true)`)) - streams, cleanupFuncs := createTSOStreams(re, s.ctx, s.backendEndpoints, 1) + streams, cleanupFuncs := createTSOStreams(s.ctx, re, s.backendEndpoints, 1) err := streams[0].Send(s.defaultReq) re.NoError(err) _, err = streams[0].Recv() @@ -369,7 +369,7 @@ func (s *tsoProxyTestSuite) generateRequests(requestsPerClient int) []*pdpb.TsoR // createTSOStreams creates multiple TSO client streams, and each stream uses a different gRPC connection // to simulate multiple clients. func createTSOStreams( - re *require.Assertions, ctx context.Context, + ctx context.Context, re *require.Assertions, backendEndpoints string, clientCount int, ) ([]pdpb.PD_TsoClient, []testutil.CleanupFunc) { cleanupFuncs := make([]testutil.CleanupFunc, clientCount) @@ -475,7 +475,7 @@ func benchmarkTSOProxyNClients(clientCount int, b *testing.B) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - streams, cleanupFuncs := createTSOStreams(re, ctx, suite.backendEndpoints, clientCount) + streams, cleanupFuncs := createTSOStreams(ctx, re, suite.backendEndpoints, clientCount) // Benchmark TSO proxy b.ResetTimer() diff --git a/tests/integrations/tso/Makefile b/tests/integrations/tso/Makefile index 4573863039c..25896ca50e4 100644 --- a/tests/integrations/tso/Makefile +++ b/tests/integrations/tso/Makefile @@ -18,9 +18,12 @@ PATH := $(GO_TOOLS_BIN_PATH):$(PATH) SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash) static: install-tools + @ echo "gofmt ..." @ gofmt -s -l -d . 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }' - @ golangci-lint run ./... - @ revive -formatter friendly -config $(ROOT_PATH)/revive.toml . + @ echo "golangci-lint ..." + @ golangci-lint run -c $(ROOT_PATH)/.golangci.yml --verbose ./... + @ echo "revive ..." + @ revive -formatter friendly -config $(ROOT_PATH)/revive.toml ./... tidy: @ go mod tidy @@ -28,11 +31,11 @@ tidy: git diff --quiet go.mod go.sum test: failpoint-enable - CGO_ENABLED=1 go test -v -tags deadlock -race -cover || { $(MAKE) failpoint-disable && exit 1; } + CGO_ENABLED=1 go test ./... -v -tags deadlock -race -cover || { $(MAKE) failpoint-disable && exit 1; } $(MAKE) failpoint-disable ci-test-job: - CGO_ENABLED=1 go test -v -tags deadlock -race -covermode=atomic -coverprofile=covprofile -coverpkg=$(ROOT_PATH)/... github.com/tikv/pd/tests/integrations/tso + CGO_ENABLED=1 go test ./... -v -tags deadlock -race -covermode=atomic -coverprofile=covprofile -coverpkg=$(ROOT_PATH)/... github.com/tikv/pd/tests/integrations/tso install-tools: cd $(ROOT_PATH) && $(MAKE) install-tools diff --git a/tests/integrations/tso/go.mod b/tests/integrations/tso/go.mod index bd193317f63..c2631f6d49f 100644 --- a/tests/integrations/tso/go.mod +++ b/tests/integrations/tso/go.mod @@ -13,7 +13,7 @@ replace google.golang.org/grpc v1.54.0 => google.golang.org/grpc v1.26.0 require ( github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c - github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2 + github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1 github.com/stretchr/testify v1.8.4 github.com/tikv/pd v0.0.0-00010101000000-000000000000 github.com/tikv/pd/client v0.0.0-00010101000000-000000000000 diff --git a/tests/integrations/tso/go.sum b/tests/integrations/tso/go.sum index b95a53d0a1f..9a88773ebbb 100644 --- a/tests/integrations/tso/go.sum +++ b/tests/integrations/tso/go.sum @@ -385,8 +385,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgWM9fSBIvaxsJHuGP0uM74HXtv3MyyGQ= github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2 h1:VM6INL8StTPYMKufyHRX2hPUMP7isHnkYvtRMA7Sdsc= -github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1 h1:sC3XRNNBQNjFJGRtSzJRvqi2aDLFOsQoCHItr9rbbY8= +github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index 9634737e18b..5f446158fb7 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -87,7 +87,6 @@ func TestConfig(t *testing.T) { scheduleConfig.EnableMakeUpReplica = false scheduleConfig.EnableRemoveExtraReplica = false scheduleConfig.EnableLocationReplacement = false - scheduleConfig.StoreLimitMode = "" re.Equal(uint64(0), scheduleConfig.MaxMergeRegionKeys) // The result of config show doesn't be 0. scheduleConfig.MaxMergeRegionKeys = scheduleConfig.GetMaxMergeRegionKeys() diff --git a/tests/pdctl/hot/hot_test.go b/tests/pdctl/hot/hot_test.go index 4e926687223..2dfa89acb52 100644 --- a/tests/pdctl/hot/hot_test.go +++ b/tests/pdctl/hot/hot_test.go @@ -283,6 +283,13 @@ func TestHotWithStoreID(t *testing.T) { re.Equal(buckets.GetStats().ReadKeys[0]/interval, item.ReadKeys) re.Equal(buckets.GetStats().WriteBytes[0]/interval, item.WriteBytes) re.Equal(buckets.GetStats().WriteKeys[0]/interval, item.WriteKeys) + + args = []string{"-u", pdAddr, "hot", "buckets", "2"} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + hotBuckets = api.HotBucketsResponse{} + re.NoError(json.Unmarshal(output, &hotBuckets)) + re.Nil(hotBuckets[2]) } func TestHistoryHotRegions(t *testing.T) { diff --git a/tests/pdctl/keyspace/keyspace_group_test.go b/tests/pdctl/keyspace/keyspace_group_test.go index 40315e835f8..1d0c8132c13 100644 --- a/tests/pdctl/keyspace/keyspace_group_test.go +++ b/tests/pdctl/keyspace/keyspace_group_test.go @@ -561,9 +561,8 @@ func TestShowKeyspaceGroupPrimary(t *testing.T) { args := []string{"-u", pdAddr, "keyspace-group"} output, err := pdctl.ExecuteCommand(cmd, append(args, "1")...) re.NoError(err) - err = json.Unmarshal(output, &keyspaceGroup) - re.NoError(err) + re.NoErrorf(err, "output: %s", string(output)) return len(keyspaceGroup.Members) == 2 }) for _, member := range keyspaceGroup.Members { diff --git a/tests/pdctl/keyspace/keyspace_test.go b/tests/pdctl/keyspace/keyspace_test.go new file mode 100644 index 00000000000..a0bab4114df --- /dev/null +++ b/tests/pdctl/keyspace/keyspace_test.go @@ -0,0 +1,103 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package keyspace_test + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "testing" + + "github.com/pingcap/failpoint" + "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/keyspace" + "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/utils/testutil" + api "github.com/tikv/pd/server/apiv2/handlers" + "github.com/tikv/pd/server/config" + "github.com/tikv/pd/tests" + "github.com/tikv/pd/tests/pdctl" + pdctlCmd "github.com/tikv/pd/tools/pd-ctl/pdctl" +) + +func TestKeyspace(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`)) + keyspaces := make([]string, 0) + for i := 1; i < 10; i++ { + keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) + } + tc, err := tests.NewTestAPICluster(ctx, 3, func(conf *config.Config, serverName string) { + conf.Keyspace.PreAlloc = keyspaces + }) + re.NoError(err) + err = tc.RunInitialServers() + re.NoError(err) + pdAddr := tc.GetConfig().GetClientURL() + + ttc, err := tests.NewTestTSOCluster(ctx, 2, pdAddr) + re.NoError(err) + defer ttc.Destroy() + cmd := pdctlCmd.GetRootCmd() + + tc.WaitLeader() + leaderServer := tc.GetServer(tc.GetLeader()) + re.NoError(leaderServer.BootstrapCluster()) + defaultKeyspaceGroupID := fmt.Sprintf("%d", utils.DefaultKeyspaceGroupID) + + var k api.KeyspaceMeta + keyspaceName := "keyspace_1" + testutil.Eventually(re, func() bool { + args := []string{"-u", pdAddr, "keyspace", keyspaceName} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.NoError(json.Unmarshal(output, &k)) + return k.GetName() == keyspaceName + }) + re.Equal(uint32(1), k.GetId()) + re.Equal(defaultKeyspaceGroupID, k.Config[keyspace.TSOKeyspaceGroupIDKey]) + + // split keyspace group. + newGroupID := "2" + testutil.Eventually(re, func() bool { + args := []string{"-u", pdAddr, "keyspace-group", "split", "0", newGroupID, "1"} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + return strings.Contains(string(output), "Success") + }) + + // check keyspace group in keyspace whether changed. + testutil.Eventually(re, func() bool { + args := []string{"-u", pdAddr, "keyspace", keyspaceName} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.NoError(json.Unmarshal(output, &k)) + return newGroupID == k.Config[keyspace.TSOKeyspaceGroupIDKey] + }) + + // test error name + args := []string{"-u", pdAddr, "keyspace", "error_name"} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + re.Contains(string(output), "Fail") + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller")) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop")) +} diff --git a/tests/server/api/api_test.go b/tests/server/api/api_test.go index 1e2a5531ba6..080eeb44b1b 100644 --- a/tests/server/api/api_test.go +++ b/tests/server/api/api_test.go @@ -377,6 +377,15 @@ func (suite *middlewareTestSuite) TestRateLimitMiddleware() { } } +func (suite *middlewareTestSuite) TestSwaggerUrl() { + leader := suite.cluster.GetServer(suite.cluster.GetLeader()) + req, _ := http.NewRequest(http.MethodGet, leader.GetAddr()+"/swagger/ui/index", nil) + resp, err := dialClient.Do(req) + suite.NoError(err) + suite.True(resp.StatusCode == http.StatusNotFound) + resp.Body.Close() +} + func (suite *middlewareTestSuite) TestAuditPrometheusBackend() { leader := suite.cluster.GetServer(suite.cluster.GetLeader()) input := map[string]interface{}{ diff --git a/tools/pd-ctl/pdctl/command/keyspace_command.go b/tools/pd-ctl/pdctl/command/keyspace_command.go new file mode 100644 index 00000000000..a68e2f05a80 --- /dev/null +++ b/tools/pd-ctl/pdctl/command/keyspace_command.go @@ -0,0 +1,48 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "fmt" + "net/http" + + "github.com/spf13/cobra" +) + +const keyspacePrefix = "pd/api/v2/keyspaces" + +// NewKeyspaceCommand returns a keyspace subcommand of rootCmd. +func NewKeyspaceCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "keyspace [command] [flags]", + Short: "show keyspace information", + Run: showKeyspaceCommandFunc, + } + return cmd +} + +func showKeyspaceCommandFunc(cmd *cobra.Command, args []string) { + if len(args) != 1 { + cmd.Usage() + return + } + + resp, err := doRequest(cmd, fmt.Sprintf("%s/%s?force_refresh_group_id=true", keyspacePrefix, args[0]), http.MethodGet, http.Header{}) + if err != nil { + cmd.Printf("Failed to get the keyspace information: %s\n", err) + return + } + cmd.Println(resp) +} diff --git a/tools/pd-ctl/pdctl/ctl.go b/tools/pd-ctl/pdctl/ctl.go index 8fc3a454d7a..86494c046eb 100644 --- a/tools/pd-ctl/pdctl/ctl.go +++ b/tools/pd-ctl/pdctl/ctl.go @@ -66,6 +66,7 @@ func GetRootCmd() *cobra.Command { command.NewCompletionCommand(), command.NewUnsafeCommand(), command.NewKeyspaceGroupCommand(), + command.NewKeyspaceCommand(), ) rootCmd.Flags().ParseErrorsWhitelist.UnknownFlags = true diff --git a/tools/pd-tso-bench/go.sum b/tools/pd-tso-bench/go.sum index 7e8996db186..893f446ddb5 100644 --- a/tools/pd-tso-bench/go.sum +++ b/tools/pd-tso-bench/go.sum @@ -864,8 +864,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2 h1:VM6INL8StTPYMKufyHRX2hPUMP7isHnkYvtRMA7Sdsc= -github.com/pingcap/kvproto v0.0.0-20230705080816-a48f4fe282a2/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= +github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1 h1:sC3XRNNBQNjFJGRtSzJRvqi2aDLFOsQoCHItr9rbbY8= +github.com/pingcap/kvproto v0.0.0-20230713060620-89756bd21be1/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=