diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index df560ac19f1..0323c6b0b33 100644 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -282,29 +282,6 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { for { select { - /* high priority */ - case <-c.lowTokenNotifyChan: - c.executeOnAllGroups((*groupCostController).updateRunState) - c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) - if len(c.run.currentRequests) == 0 { - c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */) - } - if c.run.inDegradedMode { - c.executeOnAllGroups((*groupCostController).applyDegradedMode) - } - case resp := <-c.tokenResponseChan: - if resp != nil { - c.executeOnAllGroups((*groupCostController).updateRunState) - c.handleTokenBucketResponse(resp) - } - c.run.currentRequests = nil - case gc := <-c.tokenBucketUpdateChan: - go gc.handleTokenBucketUpdateEvent(c.loopCtx) - case <-c.responseDeadlineCh: - c.run.inDegradedMode = true - c.executeOnAllGroups((*groupCostController).applyDegradedMode) - log.Warn("[resource group controller] enter degraded mode") - /* tickers */ case <-cleanupTicker.C: c.cleanUpResourceGroup() @@ -339,6 +316,25 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { case <-c.loopCtx.Done(): resourceGroupStatusGauge.Reset() return + case <-c.responseDeadlineCh: + c.run.inDegradedMode = true + c.executeOnAllGroups((*groupCostController).applyDegradedMode) + log.Warn("[resource group controller] enter degraded mode") + case resp := <-c.tokenResponseChan: + if resp != nil { + c.executeOnAllGroups((*groupCostController).updateRunState) + c.handleTokenBucketResponse(resp) + } + c.run.currentRequests = nil + case <-c.lowTokenNotifyChan: + c.executeOnAllGroups((*groupCostController).updateRunState) + c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) + if len(c.run.currentRequests) == 0 { + c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */) + } + if c.run.inDegradedMode { + c.executeOnAllGroups((*groupCostController).applyDegradedMode) + } case resp, ok := <-watchMetaChannel: failpoint.Inject("disableWatch", func() { if c.ruConfig.isSingleGroupByKeyspace { @@ -409,6 +405,8 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { } log.Info("load resource controller config after config changed", zap.Reflect("config", config), zap.Reflect("ruConfig", c.ruConfig)) } + case gc := <-c.tokenBucketUpdateChan: + go gc.handleTokenBucketUpdateEvent(c.loopCtx) } } }()