Skip to content

Commit

Permalink
Increase rates for priority publishing (#328)
Browse files Browse the repository at this point in the history
* Increase rates for priority publishing

* Save file

* Fix other test
  • Loading branch information
neekolas authored Jan 5, 2024
1 parent 3947da3 commit 1189bb4
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 18 deletions.
2 changes: 1 addition & 1 deletion pkg/api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ func Test_Ratelimits_Priority(t *testing.T) {
limiter, ok := server.authorizer.Limiter.(*ratelimiter.TokenBucketRateLimiter)
require.True(t, ok)
limiter.Limits[ratelimiter.PUBLISH] = &ratelimiter.Limit{MaxTokens: 1, RatePerMinute: 0}
limiter.PriorityMultiplier = 2
limiter.PublishPriorityMultiplier = 2
envs := makeEnvelopes(3)
_, err = client.Publish(ctx, &messageV1.PublishRequest{Envelopes: envs[0:2]})
require.NoError(t, err)
Expand Down
37 changes: 22 additions & 15 deletions pkg/ratelimiter/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (
type LimitType string

const (
PRIORITY_MULTIPLIER = uint16(5)
DEFAULT_RATE_PER_MINUTE = uint16(2000)
DEFAULT_MAX_TOKENS = uint16(10000)
PUBLISH_RATE_PER_MINUTE = uint16(200)
PUBLISH_MAX_TOKENS = uint16(1000)
MAX_UINT_16 = 65535
DEFAULT_PRIORITY_MULTIPLIER = uint16(5)
PUBLISH_PRIORITY_MULTIPLIER = uint16(25)
DEFAULT_RATE_PER_MINUTE = uint16(2000)
DEFAULT_MAX_TOKENS = uint16(10000)
PUBLISH_RATE_PER_MINUTE = uint16(200)
PUBLISH_MAX_TOKENS = uint16(1000)
MAX_UINT_16 = 65535

DEFAULT LimitType = "DEF"
PUBLISH LimitType = "PUB"
Expand Down Expand Up @@ -70,13 +71,14 @@ func (l Limit) Refill(entry *Entry, multiplier uint16) {

// TokenBucketRateLimiter implements the RateLimiter interface
type TokenBucketRateLimiter struct {
log *zap.Logger
ctx context.Context
mutex sync.RWMutex
newBuckets *Buckets // buckets that can be added to
oldBuckets *Buckets // buckets to be swept for expired entries
PriorityMultiplier uint16
Limits map[LimitType]*Limit
log *zap.Logger
ctx context.Context
mutex sync.RWMutex
newBuckets *Buckets // buckets that can be added to
oldBuckets *Buckets // buckets to be swept for expired entries
PriorityMultiplier uint16
PublishPriorityMultiplier uint16
Limits map[LimitType]*Limit
}

func NewTokenBucketRateLimiter(ctx context.Context, log *zap.Logger) *TokenBucketRateLimiter {
Expand All @@ -86,7 +88,8 @@ func NewTokenBucketRateLimiter(ctx context.Context, log *zap.Logger) *TokenBucke
// TODO: need to periodically clear out expired items to avoid unlimited growth of the map.
tb.newBuckets = NewBuckets(log, "buckets1")
tb.oldBuckets = NewBuckets(log, "buckets2")
tb.PriorityMultiplier = PRIORITY_MULTIPLIER
tb.PriorityMultiplier = DEFAULT_PRIORITY_MULTIPLIER
tb.PublishPriorityMultiplier = PUBLISH_PRIORITY_MULTIPLIER
tb.Limits = map[LimitType]*Limit{
DEFAULT: {DEFAULT_MAX_TOKENS, DEFAULT_RATE_PER_MINUTE},
PUBLISH: {PUBLISH_MAX_TOKENS, PUBLISH_RATE_PER_MINUTE},
Expand All @@ -106,7 +109,11 @@ func (rl *TokenBucketRateLimiter) fillAndReturnEntry(limitType LimitType, bucket
limit := rl.getLimit(limitType)
multiplier := uint16(1)
if isPriority {
multiplier = rl.PriorityMultiplier
if limitType == PUBLISH {
multiplier = rl.PublishPriorityMultiplier
} else {
multiplier = rl.PriorityMultiplier
}
}
rl.mutex.RLock()
if entry := rl.oldBuckets.getAndRefill(bucket, limit, multiplier, false); entry != nil {
Expand Down
14 changes: 12 additions & 2 deletions pkg/ratelimiter/rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,17 @@ func TestSpendAllowListed(t *testing.T) {
// Set last seen to 5 minutes ago
entry.lastSeen = time.Now().Add(-5 * time.Minute)
entry = rl.fillAndReturnEntry(DEFAULT, walletAddress, true)
require.Equal(t, entry.tokens, uint16(5*DEFAULT_RATE_PER_MINUTE*PRIORITY_MULTIPLIER))
require.Equal(t, entry.tokens, uint16(5*DEFAULT_RATE_PER_MINUTE*DEFAULT_PRIORITY_MULTIPLIER))
}

func TestSpendAllowListedPublish(t *testing.T) {
logger, _ := zap.NewDevelopment()
rl := NewTokenBucketRateLimiter(context.Background(), logger)
entry := rl.newBuckets.getAndRefill(walletAddress, &Limit{0, 0}, 1, true)
// Set last seen to 5 minutes ago
entry.lastSeen = time.Now().Add(-5 * time.Minute)
entry = rl.fillAndReturnEntry(PUBLISH, walletAddress, true)
require.Equal(t, entry.tokens, uint16(5*PUBLISH_RATE_PER_MINUTE*PUBLISH_PRIORITY_MULTIPLIER))
}

func TestMaxUint16(t *testing.T) {
Expand All @@ -75,7 +85,7 @@ func TestMaxUint16(t *testing.T) {
// Set last seen to 1 million minutes ago
entry.lastSeen = time.Now().Add(-1000000 * time.Minute)
entry = rl.fillAndReturnEntry(DEFAULT, walletAddress, true)
require.Equal(t, entry.tokens, DEFAULT_MAX_TOKENS*PRIORITY_MULTIPLIER)
require.Equal(t, entry.tokens, DEFAULT_MAX_TOKENS*DEFAULT_PRIORITY_MULTIPLIER)
}

// Ensures that the map can be accessed concurrently
Expand Down

0 comments on commit 1189bb4

Please sign in to comment.