From 7ed32836f17e40ad0b8ebb9b0de3a5854c19e975 Mon Sep 17 00:00:00 2001 From: nolouch Date: Tue, 2 Jul 2024 16:02:36 +0800 Subject: [PATCH] resource_control: allow configuration of the maximum retry time for the local bucket Signed-off-by: nolouch --- client/resource_group/controller/config.go | 64 ++++++++++++++----- .../resource_group/controller/controller.go | 53 +++++++-------- pkg/mcs/resourcemanager/server/config.go | 8 +++ pkg/mcs/resourcemanager/server/config_test.go | 6 +- .../resourcemanager/resource_manager_test.go | 9 +++ 5 files changed, 98 insertions(+), 42 deletions(-) diff --git a/client/resource_group/controller/config.go b/client/resource_group/controller/config.go index a4176c073cc..9b7bdcbeeee 100644 --- a/client/resource_group/controller/config.go +++ b/client/resource_group/controller/config.go @@ -52,8 +52,10 @@ const ( defaultTargetPeriod = 5 * time.Second // defaultMaxWaitDuration is the max duration to wait for the token before throwing error. defaultMaxWaitDuration = 30 * time.Second + // defaultLTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC. + defaultLTBTokenRPCMaxDelay = 1 * time.Second // defaultWaitRetryTimes is the times to retry when waiting for the token. - defaultWaitRetryTimes = 10 + defaultWaitRetryTimes = 20 // defaultWaitRetryInterval is the interval to retry when waiting for the token. defaultWaitRetryInterval = 50 * time.Millisecond ) @@ -77,23 +79,35 @@ const ( // Because the resource manager has not been deployed in microservice mode, // do not enable this function. - defaultDegradedModeWaitDuration = 0 + defaultDegradedModeWaitDuration = time.Duration(0) defaultAvgBatchProportion = 0.7 ) -// Config is the configuration of the resource manager controller which includes some option for client needed. -type Config struct { +// LocalBucketRPCParams is the parameters for local bucket RPC. +type TokenRPCParams struct { + // WaitRetryInterval is the interval to retry when waiting for the token. + WaitRetryInterval Duration `toml:"wait-retry-interval" json:"wait-retry-interval"` + + // WaitRetryTimes is the times to retry when waiting for the token. + WaitRetryTimes int `toml:"wait-retry-times" json:"wait-retry-times"` +} + +// LocalBucketConfig is the configuration for local bucket. not export to server side. +type LocalBucketConfig struct { + TokenRPCParams `toml:"token-rpc-params" json:"token-rpc-params"` +} + +// BaseConfig is the configuration of the resource manager controller which includes some option for client needed. +// TODO: unified the configuration for client and server, server side in pkg/mcs/resourcemanger/config.go. +type BaseConfig struct { // EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect. DegradedModeWaitDuration Duration `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"` // LTBMaxWaitDuration is the max wait time duration for local token bucket. LTBMaxWaitDuration Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"` - // WaitRetryInterval is the interval to retry when waiting for the token. - WaitRetryInterval Duration `toml:"wait-retry-interval" json:"wait-retry-interval"` - - // WaitRetryTimes is the times to retry when waiting for the token. - WaitRetryTimes int `toml:"wait-retry-times" json:"wait-retry-times"` + // LTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC. + LTBTokenRPCMaxDelay Duration `toml:"ltb-token-rpc-max-delay" json:"ltb-token-rpc-max-delay"` // RequestUnit is the configuration determines the coefficients of the RRU and WRU cost. // This configuration should be modified carefully. @@ -103,15 +117,35 @@ type Config struct { EnableControllerTraceLog bool `toml:"enable-controller-trace-log" json:"enable-controller-trace-log,string"` } +// Config is the configuration of the resource manager controller. +type Config struct { + BaseConfig + LocalBucketConfig +} + +// Adjust adjusts the configuration. +func (c *Config) Adjust() { + if int(c.BaseConfig.LTBTokenRPCMaxDelay.Duration) != int(c.LocalBucketConfig.WaitRetryInterval.Duration)*c.LocalBucketConfig.WaitRetryTimes { + c.LocalBucketConfig.WaitRetryTimes = int(c.BaseConfig.LTBTokenRPCMaxDelay.Duration / c.LocalBucketConfig.WaitRetryInterval.Duration) + } +} + // DefaultConfig returns the default resource manager controller configuration. func DefaultConfig() *Config { return &Config{ - DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration), - LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration), - WaitRetryInterval: NewDuration(defaultWaitRetryInterval), - WaitRetryTimes: defaultWaitRetryTimes, - RequestUnit: DefaultRequestUnitConfig(), - EnableControllerTraceLog: false, + BaseConfig: BaseConfig{ + DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration), + RequestUnit: DefaultRequestUnitConfig(), + EnableControllerTraceLog: false, + LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration), + LTBTokenRPCMaxDelay: NewDuration(defaultLTBTokenRPCMaxDelay), + }, + LocalBucketConfig: LocalBucketConfig{ + TokenRPCParams: TokenRPCParams{ + WaitRetryInterval: NewDuration(defaultWaitRetryInterval), + WaitRetryTimes: defaultWaitRetryTimes, + }, + }, } } diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 1910e37eff8..df560ac19f1 100644 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -192,6 +192,7 @@ func NewResourceGroupController( log.Info("load resource controller config", zap.Reflect("config", config), zap.Reflect("ru-config", controller.ruConfig)) controller.calculators = []ResourceCalculator{newKVCalculator(controller.ruConfig), newSQLCalculator(controller.ruConfig)} controller.safeRuConfig.Store(controller.ruConfig) + enableControllerTraceLog.Store(config.EnableControllerTraceLog) return controller, nil } @@ -200,12 +201,13 @@ func loadServerConfig(ctx context.Context, provider ResourceGroupProvider) (*Con if err != nil { return nil, err } + config := DefaultConfig() + defer config.Adjust() kvs := resp.GetKvs() if len(kvs) == 0 { log.Warn("[resource group controller] server does not save config, load config failed") - return DefaultConfig(), nil + return config, nil } - config := DefaultConfig() err = json.Unmarshal(kvs[0].GetValue(), config) if err != nil { return nil, err @@ -280,6 +282,29 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { for { select { + /* high priority */ + case <-c.lowTokenNotifyChan: + c.executeOnAllGroups((*groupCostController).updateRunState) + c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) + if len(c.run.currentRequests) == 0 { + c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */) + } + if c.run.inDegradedMode { + c.executeOnAllGroups((*groupCostController).applyDegradedMode) + } + case resp := <-c.tokenResponseChan: + if resp != nil { + c.executeOnAllGroups((*groupCostController).updateRunState) + c.handleTokenBucketResponse(resp) + } + c.run.currentRequests = nil + case gc := <-c.tokenBucketUpdateChan: + go gc.handleTokenBucketUpdateEvent(c.loopCtx) + case <-c.responseDeadlineCh: + c.run.inDegradedMode = true + c.executeOnAllGroups((*groupCostController).applyDegradedMode) + log.Warn("[resource group controller] enter degraded mode") + /* tickers */ case <-cleanupTicker.C: c.cleanUpResourceGroup() @@ -308,32 +333,12 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { watchRetryTimer.Reset(watchRetryInterval) } } - case <-emergencyTokenAcquisitionTicker.C: c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition) /* channels */ case <-c.loopCtx.Done(): resourceGroupStatusGauge.Reset() return - case <-c.responseDeadlineCh: - c.run.inDegradedMode = true - c.executeOnAllGroups((*groupCostController).applyDegradedMode) - log.Warn("[resource group controller] enter degraded mode") - case resp := <-c.tokenResponseChan: - if resp != nil { - c.executeOnAllGroups((*groupCostController).updateRunState) - c.handleTokenBucketResponse(resp) - } - c.run.currentRequests = nil - case <-c.lowTokenNotifyChan: - c.executeOnAllGroups((*groupCostController).updateRunState) - c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) - if len(c.run.currentRequests) == 0 { - c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */) - } - if c.run.inDegradedMode { - c.executeOnAllGroups((*groupCostController).applyDegradedMode) - } case resp, ok := <-watchMetaChannel: failpoint.Inject("disableWatch", func() { if c.ruConfig.isSingleGroupByKeyspace { @@ -390,6 +395,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { if err := json.Unmarshal(item.Kv.Value, config); err != nil { continue } + config.Adjust() c.ruConfig = GenerateRUConfig(config) // Stay compatible with serverless @@ -403,9 +409,6 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { } log.Info("load resource controller config after config changed", zap.Reflect("config", config), zap.Reflect("ruConfig", c.ruConfig)) } - - case gc := <-c.tokenBucketUpdateChan: - go gc.handleTokenBucketUpdateEvent(c.loopCtx) } } }() diff --git a/pkg/mcs/resourcemanager/server/config.go b/pkg/mcs/resourcemanager/server/config.go index 83197b70f2e..2ccdfb05cc4 100644 --- a/pkg/mcs/resourcemanager/server/config.go +++ b/pkg/mcs/resourcemanager/server/config.go @@ -59,6 +59,8 @@ const ( defaultDegradedModeWaitDuration = time.Second * 0 // defaultMaxWaitDuration is the max duration to wait for the token before throwing error. defaultMaxWaitDuration = 30 * time.Second + // defaultLTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC. + defaultLTBTokenRPCMaxDelay = 1 * time.Second ) // Config is the configuration for the resource manager. @@ -99,6 +101,9 @@ type ControllerConfig struct { // LTBMaxWaitDuration is the max wait time duration for local token bucket. LTBMaxWaitDuration typeutil.Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"` + // LTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC. + LTBTokenRPCMaxDelay typeutil.Duration `toml:"ltb-token-rpc-max-delay" json:"ltb-token-rpc-max-delay"` + // RequestUnit is the configuration determines the coefficients of the RRU and WRU cost. // This configuration should be modified carefully. RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"` @@ -119,6 +124,9 @@ func (rmc *ControllerConfig) Adjust(meta *configutil.ConfigMetaData) { if !meta.IsDefined("ltb-max-wait-duration") { configutil.AdjustDuration(&rmc.LTBMaxWaitDuration, defaultMaxWaitDuration) } + if !meta.IsDefined("ltb-token-rpc-max-delay") { + configutil.AdjustDuration(&rmc.LTBTokenRPCMaxDelay, defaultLTBTokenRPCMaxDelay) + } failpoint.Inject("enableDegradedMode", func() { configutil.AdjustDuration(&rmc.DegradedModeWaitDuration, time.Second) }) diff --git a/pkg/mcs/resourcemanager/server/config_test.go b/pkg/mcs/resourcemanager/server/config_test.go index 2d57100468e..ae9dfc2cad3 100644 --- a/pkg/mcs/resourcemanager/server/config_test.go +++ b/pkg/mcs/resourcemanager/server/config_test.go @@ -28,6 +28,7 @@ func TestControllerConfig(t *testing.T) { cfgData := ` [controller] ltb-max-wait-duration = "60s" +ltb-token-rpc-max-delay = "500ms" degraded-mode-wait-duration = "2s" [controller.request-unit] read-base-cost = 1.0 @@ -42,8 +43,9 @@ read-cpu-ms-cost = 5.0 err = cfg.Adjust(&meta) re.NoError(err) - re.Equal(time.Second*2, cfg.Controller.DegradedModeWaitDuration.Duration) - re.Equal(time.Second*60, cfg.Controller.LTBMaxWaitDuration.Duration) + re.Equal(2*time.Second, cfg.Controller.DegradedModeWaitDuration.Duration) + re.Equal(60*time.Second, cfg.Controller.LTBMaxWaitDuration.Duration) + re.Equal(500*time.Millisecond, cfg.Controller.LTBTokenRPCMaxDelay.Duration) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.CPUMsCost-5), 1e-7) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteCostPerByte-4), 1e-7) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteBaseCost-3), 1e-7) diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index ab7cd5321ad..c8cee89cf32 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -1433,12 +1433,14 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigCh configURL := "/resource-manager/api/v1/config/controller" waitDuration := 10 * time.Second + tokenRPCMaxDelay := 2 * time.Second readBaseCost := 1.5 defaultCfg := controller.DefaultConfig() expectCfg := server.ControllerConfig{ // failpoint enableDegradedMode will setup and set it be 1s. DegradedModeWaitDuration: typeutil.NewDuration(time.Second), LTBMaxWaitDuration: typeutil.Duration(defaultCfg.LTBMaxWaitDuration), + LTBTokenRPCMaxDelay: typeutil.Duration(defaultCfg.LTBTokenRPCMaxDelay), RequestUnit: server.RequestUnitConfig(defaultCfg.RequestUnit), EnableControllerTraceLog: defaultCfg.EnableControllerTraceLog, } @@ -1461,6 +1463,13 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigCh value: waitDuration, expected: func(ruConfig *controller.RUConfig) { ruConfig.DegradedModeWaitDuration = waitDuration }, }, + { + configJSON: fmt.Sprintf(`{"ltb-token-rpc-max-delay": "%v"}`, tokenRPCMaxDelay), + value: waitDuration, + expected: func(ruConfig *controller.RUConfig) { + ruConfig.WaitRetryTimes = int(tokenRPCMaxDelay / ruConfig.WaitRetryInterval) + }, + }, { configJSON: fmt.Sprintf(`{"ltb-max-wait-duration": "%v"}`, waitDuration), value: waitDuration,