diff --git a/pkg/mcs/resource_manager/server/token_bukets.go b/pkg/mcs/resource_manager/server/token_bukets.go index 5efab52fe68..47608203847 100644 --- a/pkg/mcs/resource_manager/server/token_bukets.go +++ b/pkg/mcs/resource_manager/server/token_bukets.go @@ -20,6 +20,8 @@ import ( "github.com/gogo/protobuf/proto" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/pingcap/log" + "go.uber.org/zap" ) const ( @@ -31,6 +33,7 @@ const ( defaultReserveRatio = 0.5 defaultLoanCoefficient = 2 maxAssignTokens = math.MaxFloat64 / 1024 // assume max client connect is 1024 + slotExpireTimeout = 10 * time.Minute ) // GroupTokenBucket is a token bucket for a resource group. @@ -62,6 +65,7 @@ type TokenSlot struct { // tokenCapacity is the number of tokens in the slot. tokenCapacity float64 lastTokenCapacity float64 + lastReqTime time.Time } // GroupTokenBucketState is the running state of TokenBucket. @@ -75,7 +79,8 @@ type GroupTokenBucketState struct { LastUpdate *time.Time `json:"last_update,omitempty"` Initialized bool `json:"initialized"` // settingChanged is used to avoid that the number of tokens returned is jitter because of changing fill rate. - settingChanged bool + settingChanged bool + lastCheckExpireSlot time.Time } // Clone returns the copy of GroupTokenBucketState @@ -95,6 +100,7 @@ func (gts *GroupTokenBucketState) Clone() *GroupTokenBucketState { Initialized: gts.Initialized, tokenSlots: tokenSlots, clientConsumptionTokensSum: gts.clientConsumptionTokensSum, + lastCheckExpireSlot: gts.lastCheckExpireSlot, } } @@ -119,16 +125,18 @@ func (gts *GroupTokenBucketState) balanceSlotTokens( clientUniqueID uint64, settings *rmpb.TokenLimitSettings, requiredToken, elapseTokens float64) { + now := time.Now() slot, exist := gts.tokenSlots[clientUniqueID] if !exist { // Only slots that require a positive number will be considered alive, // but still need to allocate the elapsed tokens as well. if requiredToken != 0 { - slot = &TokenSlot{} + slot = &TokenSlot{lastReqTime: now} gts.tokenSlots[clientUniqueID] = slot gts.clientConsumptionTokensSum = 0 } } else { + slot.lastReqTime = now if gts.clientConsumptionTokensSum >= maxAssignTokens { gts.clientConsumptionTokensSum = 0 } @@ -139,6 +147,16 @@ func (gts *GroupTokenBucketState) balanceSlotTokens( } } + if time.Since(gts.lastCheckExpireSlot) >= slotExpireTimeout { + gts.lastCheckExpireSlot = now + for clientUniqueID, slot := range gts.tokenSlots { + if time.Since(slot.lastReqTime) >= slotExpireTimeout { + delete(gts.tokenSlots, clientUniqueID) + log.Info("delete resource group slot because expire", zap.Time("last-req-time", slot.lastReqTime), + zap.Any("expire timeout", slotExpireTimeout), zap.Any("del client id", clientUniqueID), zap.Any("len", len(gts.tokenSlots))) + } + } + } if len(gts.tokenSlots) == 0 { return } @@ -264,6 +282,7 @@ func (gtb *GroupTokenBucket) init(now time.Time, clientID uint64) { lastTokenCapacity: gtb.Tokens, } gtb.LastUpdate = &now + gtb.lastCheckExpireSlot = now gtb.Initialized = true }