-
Notifications
You must be signed in to change notification settings - Fork 446
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rolling window limit support #193
Changes from 25 commits
84e3323
8f517f4
b1aadd1
f2c6324
ab6f360
e516c20
4a4c9cc
b69f04d
d72f951
e5704b6
b07edff
48446cf
927b23e
f0d3c8b
5e2676b
ffff44a
33f39f9
846ba8e
e4e25cc
244e801
510abf5
307c27d
122cbf5
953958e
baf012b
1face4c
5cc166b
8ce4708
d2d32b4
bc25eb7
10404f4
3ec9cca
73df311
a400046
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
package algorithm | ||
|
||
import ( | ||
"math" | ||
|
||
"github.com/coocood/freecache" | ||
pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v3" | ||
"github.com/envoyproxy/ratelimit/src/config" | ||
"github.com/envoyproxy/ratelimit/src/utils" | ||
logger "github.com/sirupsen/logrus" | ||
) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i would suggest to add a compile time check to both window implementations, to ensure that window type implements RatelimitAlgorithm interface. It's a useful technique in Go, eg: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is done |
||
var _ RatelimitAlgorithm = (*FixedWindowImpl)(nil) | ||
|
||
type FixedWindowImpl struct { | ||
timeSource utils.TimeSource | ||
cacheKeyGenerator utils.CacheKeyGenerator | ||
localCache *freecache.Cache | ||
nearLimitRatio float32 | ||
} | ||
|
||
func (fw *FixedWindowImpl) GetResponseDescriptorStatus(key string, limit *config.RateLimit, results int64, isOverLimitWithLocalCache bool, hitsAddend int64) *pb.RateLimitResponse_DescriptorStatus { | ||
if key == "" { | ||
return &pb.RateLimitResponse_DescriptorStatus{ | ||
Code: pb.RateLimitResponse_OK, | ||
CurrentLimit: nil, | ||
LimitRemaining: 0, | ||
} | ||
} | ||
if isOverLimitWithLocalCache { | ||
fw.PopulateStats(limit, 0, uint64(hitsAddend), uint64(hitsAddend)) | ||
return &pb.RateLimitResponse_DescriptorStatus{ | ||
Code: pb.RateLimitResponse_OVER_LIMIT, | ||
CurrentLimit: limit.Limit, | ||
LimitRemaining: 0, | ||
DurationUntilReset: utils.CalculateFixedReset(limit.Limit, fw.timeSource), | ||
} | ||
} | ||
|
||
isOverLimit, limitRemaining, durationUntilReset := fw.IsOverLimit(limit, int64(results), hitsAddend) | ||
|
||
if !isOverLimit { | ||
return &pb.RateLimitResponse_DescriptorStatus{ | ||
Code: pb.RateLimitResponse_OK, | ||
CurrentLimit: limit.Limit, | ||
LimitRemaining: uint32(limitRemaining), | ||
DurationUntilReset: utils.CalculateFixedReset(limit.Limit, fw.timeSource), | ||
} | ||
} else { | ||
if fw.localCache != nil { | ||
durationUntilReset = utils.MaxInt(1, durationUntilReset) | ||
|
||
err := fw.localCache.Set([]byte(key), []byte{}, durationUntilReset) | ||
if err != nil { | ||
logger.Errorf("Failing to set local cache key: %s", key) | ||
} | ||
} | ||
|
||
return &pb.RateLimitResponse_DescriptorStatus{ | ||
Code: pb.RateLimitResponse_OVER_LIMIT, | ||
CurrentLimit: limit.Limit, | ||
LimitRemaining: uint32(limitRemaining), | ||
DurationUntilReset: utils.CalculateFixedReset(limit.Limit, fw.timeSource), | ||
} | ||
} | ||
} | ||
|
||
func (fw *FixedWindowImpl) IsOverLimit(limit *config.RateLimit, results int64, hitsAddend int64) (bool, int64, int) { | ||
limitAfterIncrease := results | ||
limitBeforeIncrease := limitAfterIncrease - int64(hitsAddend) | ||
overLimitThreshold := int64(limit.Limit.RequestsPerUnit) | ||
nearLimitThreshold := int64(math.Floor(float64(float32(overLimitThreshold) * fw.nearLimitRatio))) | ||
|
||
if limitAfterIncrease > overLimitThreshold { | ||
if limitBeforeIncrease >= overLimitThreshold { | ||
fw.PopulateStats(limit, 0, uint64(hitsAddend), 0) | ||
} else { | ||
fw.PopulateStats(limit, uint64(overLimitThreshold-utils.MaxInt64(nearLimitThreshold, limitBeforeIncrease)), uint64(limitAfterIncrease-overLimitThreshold), 0) | ||
} | ||
|
||
return true, 0, int(utils.UnitToDivider(limit.Limit.Unit)) | ||
} else { | ||
if limitAfterIncrease > nearLimitThreshold { | ||
if limitBeforeIncrease >= nearLimitThreshold { | ||
fw.PopulateStats(limit, uint64(hitsAddend), 0, 0) | ||
} else { | ||
fw.PopulateStats(limit, uint64(limitAfterIncrease-nearLimitThreshold), 0, 0) | ||
} | ||
} | ||
|
||
return false, overLimitThreshold - limitAfterIncrease, int(utils.UnitToDivider(limit.Limit.Unit)) | ||
} | ||
} | ||
|
||
func (fw *FixedWindowImpl) IsOverLimitWithLocalCache(key string) bool { | ||
if fw.localCache != nil { | ||
_, err := fw.localCache.Get([]byte(key)) | ||
if err == nil { | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
|
||
func (fw *FixedWindowImpl) GenerateCacheKeys(request *pb.RateLimitRequest, | ||
limits []*config.RateLimit, hitsAddend int64) []utils.CacheKey { | ||
return fw.cacheKeyGenerator.GenerateCacheKeys(request, limits, uint32(hitsAddend), fw.timeSource.UnixNow()) | ||
} | ||
|
||
func (fw *FixedWindowImpl) PopulateStats(limit *config.RateLimit, nearLimit uint64, overLimit uint64, overLimitWithLocalCache uint64) { | ||
limit.Stats.NearLimit.Add(nearLimit) | ||
limit.Stats.OverLimit.Add(overLimit) | ||
limit.Stats.OverLimitWithLocalCache.Add(overLimitWithLocalCache) | ||
} | ||
|
||
func (fw *FixedWindowImpl) GetExpirationSeconds() int64 { | ||
return 0 | ||
} | ||
|
||
func (fw *FixedWindowImpl) GetResultsAfterIncrease() int64 { | ||
return 0 | ||
} | ||
|
||
func NewFixedWindowAlgorithm(timeSource utils.TimeSource, localCache *freecache.Cache, nearLimitRatio float32, cacheKeyPrefix string) *FixedWindowImpl { | ||
return &FixedWindowImpl{ | ||
timeSource: timeSource, | ||
cacheKeyGenerator: utils.NewCacheKeyGenerator(cacheKeyPrefix), | ||
localCache: localCache, | ||
nearLimitRatio: nearLimitRatio, | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
package algorithm | ||
|
||
import ( | ||
pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v3" | ||
"github.com/envoyproxy/ratelimit/src/config" | ||
"github.com/envoyproxy/ratelimit/src/utils" | ||
) | ||
|
||
type RatelimitAlgorithm interface { | ||
IsOverLimit(limit *config.RateLimit, results int64, hitsAddend int64) (bool, int64, int) | ||
IsOverLimitWithLocalCache(key string) bool | ||
|
||
GetResponseDescriptorStatus(key string, limit *config.RateLimit, results int64, isOverLimitWithLocalCache bool, hitsAddend int64) *pb.RateLimitResponse_DescriptorStatus | ||
GetExpirationSeconds() int64 | ||
GetResultsAfterIncrease() int64 | ||
|
||
GenerateCacheKeys(request *pb.RateLimitRequest, | ||
limits []*config.RateLimit, hitsAddend int64) []utils.CacheKey | ||
PopulateStats(limit *config.RateLimit, nearLimit uint64, overLimit uint64, overLimitWithLocalCache uint64) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
package algorithm | ||
|
||
import ( | ||
"math" | ||
|
||
"github.com/coocood/freecache" | ||
pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v3" | ||
"github.com/envoyproxy/ratelimit/src/config" | ||
"github.com/envoyproxy/ratelimit/src/utils" | ||
"github.com/golang/protobuf/ptypes/duration" | ||
logger "github.com/sirupsen/logrus" | ||
) | ||
|
||
const DummyCacheKeyTime = 0 | ||
|
||
var _ RatelimitAlgorithm = (*RollingWindowImpl)(nil) | ||
|
||
type RollingWindowImpl struct { | ||
timeSource utils.TimeSource | ||
cacheKeyGenerator utils.CacheKeyGenerator | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same for |
||
localCache *freecache.Cache | ||
nearLimitRatio float32 | ||
arrivedAt int64 | ||
tat int64 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: either expand the field names or add explanatory comments. |
||
newTat int64 | ||
diff int64 | ||
} | ||
|
||
func (rw *RollingWindowImpl) GetResponseDescriptorStatus(key string, limit *config.RateLimit, results int64, isOverLimitWithLocalCache bool, hitsAddend int64) *pb.RateLimitResponse_DescriptorStatus { | ||
if key == "" { | ||
return &pb.RateLimitResponse_DescriptorStatus{ | ||
Code: pb.RateLimitResponse_OK, | ||
CurrentLimit: nil, | ||
LimitRemaining: 0, | ||
} | ||
} | ||
|
||
if isOverLimitWithLocalCache { | ||
rw.PopulateStats(limit, 0, uint64(hitsAddend), uint64(hitsAddend)) | ||
|
||
secondsToReset := utils.UnitToDivider(limit.Limit.Unit) | ||
secondsToReset -= utils.NanosecondsToSeconds(rw.timeSource.UnixNanoNow()) % secondsToReset | ||
return &pb.RateLimitResponse_DescriptorStatus{ | ||
Code: pb.RateLimitResponse_OVER_LIMIT, | ||
CurrentLimit: limit.Limit, | ||
LimitRemaining: 0, | ||
DurationUntilReset: &duration.Duration{Seconds: secondsToReset}, | ||
} | ||
} | ||
|
||
isOverLimit, limitRemaining, durationUntilReset := rw.IsOverLimit(limit, int64(results), hitsAddend) | ||
|
||
if !isOverLimit { | ||
return &pb.RateLimitResponse_DescriptorStatus{ | ||
Code: pb.RateLimitResponse_OK, | ||
CurrentLimit: limit.Limit, | ||
LimitRemaining: uint32(limitRemaining), | ||
DurationUntilReset: utils.NanosecondsToDuration(rw.newTat - rw.arrivedAt), | ||
} | ||
} else { | ||
if rw.localCache != nil { | ||
durationUntilReset = utils.MaxInt(1, durationUntilReset) | ||
|
||
err := rw.localCache.Set([]byte(key), []byte{}, durationUntilReset) | ||
if err != nil { | ||
logger.Errorf("Failing to set local cache key: %s", key) | ||
} | ||
} | ||
|
||
return &pb.RateLimitResponse_DescriptorStatus{ | ||
Code: pb.RateLimitResponse_OVER_LIMIT, | ||
CurrentLimit: limit.Limit, | ||
LimitRemaining: 0, | ||
DurationUntilReset: utils.NanosecondsToDuration(int64(math.Ceil(float64(rw.tat - rw.arrivedAt)))), | ||
} | ||
} | ||
} | ||
|
||
func (rw *RollingWindowImpl) IsOverLimit(limit *config.RateLimit, results int64, hitsAddend int64) (bool, int64, int) { | ||
now := rw.timeSource.UnixNanoNow() | ||
|
||
// Time during computation should be in nanosecond | ||
rw.arrivedAt = now | ||
// Tat is set to current request timestamp if not set before | ||
rw.tat = utils.MaxInt64(results, rw.arrivedAt) | ||
totalLimit := int64(limit.Limit.RequestsPerUnit) | ||
period := utils.SecondsToNanoseconds(utils.UnitToDivider(limit.Limit.Unit)) | ||
quantity := int64(hitsAddend) | ||
|
||
// GCRA computation | ||
// Emission interval is the cost of each request | ||
emissionInterval := period / totalLimit | ||
// New tat define the end of the window | ||
rw.newTat = rw.tat + emissionInterval*quantity | ||
// We allow the request if it's inside the window | ||
allowAt := rw.newTat - period | ||
rw.diff = rw.arrivedAt - allowAt | ||
|
||
previousAllowAt := rw.tat - period | ||
previousLimitRemaining := int64(math.Ceil(float64((rw.arrivedAt - previousAllowAt) / emissionInterval))) | ||
previousLimitRemaining = utils.MaxInt64(previousLimitRemaining, 0) | ||
nearLimitWindow := int64(math.Ceil(float64(float32(limit.Limit.RequestsPerUnit) * (1.0 - rw.nearLimitRatio)))) | ||
limitRemaining := int64(math.Ceil(float64(rw.diff / emissionInterval))) | ||
hitNearLimit := quantity - (utils.MaxInt64(previousLimitRemaining, nearLimitWindow) - nearLimitWindow) | ||
|
||
if rw.diff < 0 { | ||
rw.PopulateStats(limit, uint64(utils.MinInt64(previousLimitRemaining, nearLimitWindow)), uint64(quantity-previousLimitRemaining), 0) | ||
|
||
return true, 0, int(utils.NanosecondsToSeconds(-rw.diff)) | ||
} else { | ||
if hitNearLimit > 0 { | ||
rw.PopulateStats(limit, uint64(hitNearLimit), 0, 0) | ||
} | ||
|
||
return false, limitRemaining, 0 | ||
} | ||
} | ||
|
||
func (rw *RollingWindowImpl) IsOverLimitWithLocalCache(key string) bool { | ||
if rw.localCache != nil { | ||
_, err := rw.localCache.Get([]byte(key)) | ||
if err == nil { | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
|
||
func (rw *RollingWindowImpl) GetExpirationSeconds() int64 { | ||
if rw.diff < 0 { | ||
return utils.NanosecondsToSeconds(rw.tat-rw.arrivedAt) + 1 | ||
} | ||
return utils.NanosecondsToSeconds(rw.newTat-rw.arrivedAt) + 1 | ||
} | ||
|
||
func (rw *RollingWindowImpl) GetResultsAfterIncrease() int64 { | ||
if rw.diff < 0 { | ||
return rw.tat | ||
} | ||
return rw.newTat | ||
} | ||
|
||
func (rw *RollingWindowImpl) GenerateCacheKeys(request *pb.RateLimitRequest, | ||
limits []*config.RateLimit, hitsAddend int64) []utils.CacheKey { | ||
return rw.cacheKeyGenerator.GenerateCacheKeys(request, limits, uint32(hitsAddend), DummyCacheKeyTime) | ||
} | ||
|
||
func (rw *RollingWindowImpl) PopulateStats(limit *config.RateLimit, nearLimit uint64, overLimit uint64, overLimitWithLocalCache uint64) { | ||
limit.Stats.NearLimit.Add(nearLimit) | ||
limit.Stats.OverLimit.Add(overLimit) | ||
limit.Stats.OverLimitWithLocalCache.Add(overLimitWithLocalCache) | ||
} | ||
|
||
func NewRollingWindowAlgorithm(timeSource utils.TimeSource, localCache *freecache.Cache, nearLimitRatio float32, cacheKeyPrefix string) *RollingWindowImpl { | ||
return &RollingWindowImpl{ | ||
timeSource: timeSource, | ||
cacheKeyGenerator: utils.NewCacheKeyGenerator(cacheKeyPrefix), | ||
localCache: localCache, | ||
nearLimitRatio: nearLimitRatio, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:s/it is able/it is possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be even more clear to rephrase like 'Initially rate limiter can take a burst of...`