Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs/resourcemanager: delete expire tokenSlot (#7344) #7349

Merged
Merged
23 changes: 21 additions & 2 deletions pkg/mcs/resource_manager/server/token_bukets.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

"github.com/gogo/protobuf/proto"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"go.uber.org/zap"
)

const (
Expand All @@ -31,6 +33,7 @@
defaultReserveRatio = 0.5
defaultLoanCoefficient = 2
maxAssignTokens = math.MaxFloat64 / 1024 // assume max client connect is 1024
slotExpireTimeout = 10 * time.Minute
)

// GroupTokenBucket is a token bucket for a resource group.
Expand Down Expand Up @@ -62,6 +65,7 @@
// tokenCapacity is the number of tokens in the slot.
tokenCapacity float64
lastTokenCapacity float64
lastReqTime time.Time
}

// GroupTokenBucketState is the running state of TokenBucket.
Expand All @@ -75,7 +79,8 @@
LastUpdate *time.Time `json:"last_update,omitempty"`
Initialized bool `json:"initialized"`
// settingChanged is used to avoid that the number of tokens returned is jitter because of changing fill rate.
settingChanged bool
settingChanged bool
lastCheckExpireSlot time.Time
}

// Clone returns the copy of GroupTokenBucketState
Expand All @@ -95,6 +100,7 @@
Initialized: gts.Initialized,
tokenSlots: tokenSlots,
clientConsumptionTokensSum: gts.clientConsumptionTokensSum,
lastCheckExpireSlot: gts.lastCheckExpireSlot,
}
}

Expand All @@ -119,16 +125,18 @@
clientUniqueID uint64,
settings *rmpb.TokenLimitSettings,
requiredToken, elapseTokens float64) {
now := time.Now()
slot, exist := gts.tokenSlots[clientUniqueID]
if !exist {
// Only slots that require a positive number will be considered alive,
// but still need to allocate the elapsed tokens as well.
if requiredToken != 0 {
slot = &TokenSlot{}
slot = &TokenSlot{lastReqTime: now}
gts.tokenSlots[clientUniqueID] = slot
gts.clientConsumptionTokensSum = 0
}
} else {
slot.lastReqTime = now
if gts.clientConsumptionTokensSum >= maxAssignTokens {
gts.clientConsumptionTokensSum = 0
}
Expand All @@ -139,6 +147,16 @@
}
}

if time.Since(gts.lastCheckExpireSlot) >= slotExpireTimeout {
gts.lastCheckExpireSlot = now
for clientUniqueID, slot := range gts.tokenSlots {
if time.Since(slot.lastReqTime) >= slotExpireTimeout {
delete(gts.tokenSlots, clientUniqueID)
log.Info("delete resource group slot because expire", zap.Time("last-req-time", slot.lastReqTime),
zap.Any("expire timeout", slotExpireTimeout), zap.Any("del client id", clientUniqueID), zap.Any("len", len(gts.tokenSlots)))

Check warning on line 156 in pkg/mcs/resource_manager/server/token_bukets.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resource_manager/server/token_bukets.go#L154-L156

Added lines #L154 - L156 were not covered by tests
}
}
}
if len(gts.tokenSlots) == 0 {
return
}
Expand Down Expand Up @@ -264,6 +282,7 @@
lastTokenCapacity: gtb.Tokens,
}
gtb.LastUpdate = &now
gtb.lastCheckExpireSlot = now
gtb.Initialized = true
}

Expand Down
Loading