From 5b9971cc79b8ef1b075221a6324f0c1e536ff8e7 Mon Sep 17 00:00:00 2001 From: Nicholas Molnar <65710+neekolas@users.noreply.github.com> Date: Fri, 5 Jan 2024 09:40:09 -0800 Subject: [PATCH 1/3] Increase rates for priority publishing --- pkg/ratelimiter/rate_limiter.go | 30 +++++++++++++++------------- pkg/ratelimiter/rate_limiter_test.go | 14 +++++++++++-- 2 files changed, 28 insertions(+), 16 deletions(-) diff --git a/pkg/ratelimiter/rate_limiter.go b/pkg/ratelimiter/rate_limiter.go index 00c865d0..5ed2f18d 100644 --- a/pkg/ratelimiter/rate_limiter.go +++ b/pkg/ratelimiter/rate_limiter.go @@ -14,12 +14,13 @@ import ( type LimitType string const ( - PRIORITY_MULTIPLIER = uint16(5) - DEFAULT_RATE_PER_MINUTE = uint16(2000) - DEFAULT_MAX_TOKENS = uint16(10000) - PUBLISH_RATE_PER_MINUTE = uint16(200) - PUBLISH_MAX_TOKENS = uint16(1000) - MAX_UINT_16 = 65535 + DEFAULT_PRIORITY_MULTIPLIER = uint16(5) + PUBLISH_PRIORITY_MULTIPLIER = uint16(25) + DEFAULT_RATE_PER_MINUTE = uint16(2000) + DEFAULT_MAX_TOKENS = uint16(10000) + PUBLISH_RATE_PER_MINUTE = uint16(200) + PUBLISH_MAX_TOKENS = uint16(1000) + MAX_UINT_16 = 65535 DEFAULT LimitType = "DEF" PUBLISH LimitType = "PUB" @@ -70,13 +71,14 @@ func (l Limit) Refill(entry *Entry, multiplier uint16) { // TokenBucketRateLimiter implements the RateLimiter interface type TokenBucketRateLimiter struct { - log *zap.Logger - ctx context.Context - mutex sync.RWMutex - newBuckets *Buckets // buckets that can be added to - oldBuckets *Buckets // buckets to be swept for expired entries - PriorityMultiplier uint16 - Limits map[LimitType]*Limit + log *zap.Logger + ctx context.Context + mutex sync.RWMutex + newBuckets *Buckets // buckets that can be added to + oldBuckets *Buckets // buckets to be swept for expired entries + PriorityMultiplier uint16 + PublishPriorityMultiplier uint16 + Limits map[LimitType]*Limit } func NewTokenBucketRateLimiter(ctx context.Context, log *zap.Logger) *TokenBucketRateLimiter { @@ -86,7 +88,7 @@ func NewTokenBucketRateLimiter(ctx context.Context, log *zap.Logger) *TokenBucke // TODO: need to periodically clear out expired items to avoid unlimited growth of the map. tb.newBuckets = NewBuckets(log, "buckets1") tb.oldBuckets = NewBuckets(log, "buckets2") - tb.PriorityMultiplier = PRIORITY_MULTIPLIER + tb.PriorityMultiplier = DEFAULT_PRIORITY_MULTIPLIER tb.Limits = map[LimitType]*Limit{ DEFAULT: {DEFAULT_MAX_TOKENS, DEFAULT_RATE_PER_MINUTE}, PUBLISH: {PUBLISH_MAX_TOKENS, PUBLISH_RATE_PER_MINUTE}, diff --git a/pkg/ratelimiter/rate_limiter_test.go b/pkg/ratelimiter/rate_limiter_test.go index c9046184..0cb30efc 100644 --- a/pkg/ratelimiter/rate_limiter_test.go +++ b/pkg/ratelimiter/rate_limiter_test.go @@ -65,7 +65,17 @@ func TestSpendAllowListed(t *testing.T) { // Set last seen to 5 minutes ago entry.lastSeen = time.Now().Add(-5 * time.Minute) entry = rl.fillAndReturnEntry(DEFAULT, walletAddress, true) - require.Equal(t, entry.tokens, uint16(5*DEFAULT_RATE_PER_MINUTE*PRIORITY_MULTIPLIER)) + require.Equal(t, entry.tokens, uint16(5*DEFAULT_RATE_PER_MINUTE*DEFAULT_PRIORITY_MULTIPLIER)) +} + +func TestSpendAllowListedPublish(t *testing.T) { + logger, _ := zap.NewDevelopment() + rl := NewTokenBucketRateLimiter(context.Background(), logger) + entry := rl.newBuckets.getAndRefill(walletAddress, &Limit{0, 0}, 1, true) + // Set last seen to 5 minutes ago + entry.lastSeen = time.Now().Add(-5 * time.Minute) + entry = rl.fillAndReturnEntry(PUBLISH, walletAddress, true) + require.Equal(t, entry.tokens, uint16(5*PUBLISH_RATE_PER_MINUTE*PUBLISH_PRIORITY_MULTIPLIER)) } func TestMaxUint16(t *testing.T) { @@ -75,7 +85,7 @@ func TestMaxUint16(t *testing.T) { // Set last seen to 1 million minutes ago entry.lastSeen = time.Now().Add(-1000000 * time.Minute) entry = rl.fillAndReturnEntry(DEFAULT, walletAddress, true) - require.Equal(t, entry.tokens, DEFAULT_MAX_TOKENS*PRIORITY_MULTIPLIER) + require.Equal(t, entry.tokens, DEFAULT_MAX_TOKENS*DEFAULT_PRIORITY_MULTIPLIER) } // Ensures that the map can be accessed concurrently From ee39236de6d5421404c88752574fa4da075156b6 Mon Sep 17 00:00:00 2001 From: Nicholas Molnar <65710+neekolas@users.noreply.github.com> Date: Fri, 5 Jan 2024 09:48:25 -0800 Subject: [PATCH 2/3] Save file --- pkg/ratelimiter/rate_limiter.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/ratelimiter/rate_limiter.go b/pkg/ratelimiter/rate_limiter.go index 5ed2f18d..0d391925 100644 --- a/pkg/ratelimiter/rate_limiter.go +++ b/pkg/ratelimiter/rate_limiter.go @@ -89,6 +89,7 @@ func NewTokenBucketRateLimiter(ctx context.Context, log *zap.Logger) *TokenBucke tb.newBuckets = NewBuckets(log, "buckets1") tb.oldBuckets = NewBuckets(log, "buckets2") tb.PriorityMultiplier = DEFAULT_PRIORITY_MULTIPLIER + tb.PublishPriorityMultiplier = PUBLISH_PRIORITY_MULTIPLIER tb.Limits = map[LimitType]*Limit{ DEFAULT: {DEFAULT_MAX_TOKENS, DEFAULT_RATE_PER_MINUTE}, PUBLISH: {PUBLISH_MAX_TOKENS, PUBLISH_RATE_PER_MINUTE}, @@ -108,7 +109,11 @@ func (rl *TokenBucketRateLimiter) fillAndReturnEntry(limitType LimitType, bucket limit := rl.getLimit(limitType) multiplier := uint16(1) if isPriority { - multiplier = rl.PriorityMultiplier + if limitType == PUBLISH { + multiplier = rl.PublishPriorityMultiplier + } else { + multiplier = rl.PriorityMultiplier + } } rl.mutex.RLock() if entry := rl.oldBuckets.getAndRefill(bucket, limit, multiplier, false); entry != nil { From c465e1171e1d03a04caeacb758148cc91be57292 Mon Sep 17 00:00:00 2001 From: Nicholas Molnar <65710+neekolas@users.noreply.github.com> Date: Fri, 5 Jan 2024 09:57:42 -0800 Subject: [PATCH 3/3] Fix other test --- pkg/api/server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/api/server_test.go b/pkg/api/server_test.go index 9ac580a4..df65d0b2 100644 --- a/pkg/api/server_test.go +++ b/pkg/api/server_test.go @@ -802,7 +802,7 @@ func Test_Ratelimits_Priority(t *testing.T) { limiter, ok := server.authorizer.Limiter.(*ratelimiter.TokenBucketRateLimiter) require.True(t, ok) limiter.Limits[ratelimiter.PUBLISH] = &ratelimiter.Limit{MaxTokens: 1, RatePerMinute: 0} - limiter.PriorityMultiplier = 2 + limiter.PublishPriorityMultiplier = 2 envs := makeEnvelopes(3) _, err = client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs[0:2]}) require.NoError(t, err)