From 10884f0a991529b9f98df37de3156607c35b91c0 Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Wed, 3 Feb 2021 12:26:50 -0800 Subject: [PATCH] Deprecate token bucket (#1253) * Update visibilitySamplingClient using quotas rate limiter * Fix multi stage rate limiter allow function --- .../visibilitySamplingClient_test.go | 2 +- .../persistence/visibilitySamplingClient.go | 148 +++----- .../quotas/multi_stage_rate_limiter_impl.go | 6 +- .../multi_stage_rate_limiter_impl_test.go | 24 +- common/tokenbucket/tb.go | 347 ------------------ common/tokenbucket/tb_test.go | 232 ------------ tools/cli/adminElasticSearchCommands.go | 12 +- 7 files changed, 92 insertions(+), 679 deletions(-) delete mode 100644 common/tokenbucket/tb.go delete mode 100644 common/tokenbucket/tb_test.go diff --git a/common/persistence/persistence-tests/visibilitySamplingClient_test.go b/common/persistence/persistence-tests/visibilitySamplingClient_test.go index f58c4a8c5e9..96a5e62e1e1 100644 --- a/common/persistence/persistence-tests/visibilitySamplingClient_test.go +++ b/common/persistence/persistence-tests/visibilitySamplingClient_test.go @@ -73,7 +73,7 @@ func (s *VisibilitySamplingSuite) SetupTest() { s.persistence = &mocks.VisibilityManager{} config := &c.VisibilityConfig{ VisibilityOpenMaxQPS: dynamicconfig.GetIntPropertyFilteredByNamespace(1), - VisibilityClosedMaxQPS: dynamicconfig.GetIntPropertyFilteredByNamespace(10), + VisibilityClosedMaxQPS: dynamicconfig.GetIntPropertyFilteredByNamespace(2), VisibilityListMaxQPS: dynamicconfig.GetIntPropertyFilteredByNamespace(1), } s.metricClient = &mmocks.Client{} diff --git a/common/persistence/visibilitySamplingClient.go b/common/persistence/visibilitySamplingClient.go index a52b38fb101..0e31e3bae8c 100644 --- a/common/persistence/visibilitySamplingClient.go +++ b/common/persistence/visibilitySamplingClient.go @@ -25,29 +25,17 @@ package persistence import ( - "sync" - - enumspb "go.temporal.io/api/enums/v1" - - "go.temporal.io/server/common/clock" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/quotas" "go.temporal.io/server/common/service/config" - "go.temporal.io/server/common/tokenbucket" -) - -const ( - // To sample visibility request, open has only 1 bucket, closed has 2 - numOfPriorityForOpen = 1 - numOfPriorityForClosed = 2 - numOfPriorityForList = 1 ) type visibilitySamplingClient struct { - rateLimitersForOpen *namespaceToBucketMap - rateLimitersForClosed *namespaceToBucketMap - rateLimitersForList *namespaceToBucketMap + rateLimitersForOpen quotas.NamespaceRateLimiter + rateLimitersForClosed quotas.NamespaceRateLimiter + rateLimitersForList quotas.NamespaceRateLimiter persistence VisibilityManager config *config.VisibilityConfig metricClient metrics.Client @@ -57,55 +45,57 @@ type visibilitySamplingClient struct { var _ VisibilityManager = (*visibilitySamplingClient)(nil) // NewVisibilitySamplingClient creates a client to manage visibility with sampling -func NewVisibilitySamplingClient(persistence VisibilityManager, config *config.VisibilityConfig, metricClient metrics.Client, logger log.Logger) VisibilityManager { - return &visibilitySamplingClient{ - persistence: persistence, - rateLimitersForOpen: newNamespaceToBucketMap(), - rateLimitersForClosed: newNamespaceToBucketMap(), - rateLimitersForList: newNamespaceToBucketMap(), - config: config, - metricClient: metricClient, - logger: logger, - } -} - -type namespaceToBucketMap struct { - sync.RWMutex - mappings map[string]tokenbucket.PriorityTokenBucket -} - -func newNamespaceToBucketMap() *namespaceToBucketMap { - return &namespaceToBucketMap{ - mappings: make(map[string]tokenbucket.PriorityTokenBucket), - } -} - -func (m *namespaceToBucketMap) getRateLimiter(namespace string, numOfPriority, qps int) tokenbucket.PriorityTokenBucket { - m.RLock() - rateLimiter, exist := m.mappings[namespace] - m.RUnlock() - - if exist { - return rateLimiter +func NewVisibilitySamplingClient( + persistence VisibilityManager, + config *config.VisibilityConfig, + metricClient metrics.Client, + logger log.Logger, +) VisibilityManager { + + var persistenceRateLimiter []quotas.RateLimiter + if config.MaxQPS != nil { + persistenceRateLimiter = append(persistenceRateLimiter, quotas.NewDefaultIncomingDynamicRateLimiter( + func() float64 { return float64(config.MaxQPS()) }, + )) } - m.Lock() - if rateLimiter, ok := m.mappings[namespace]; ok { // read again to ensure no duplicate create - m.Unlock() - return rateLimiter + return &visibilitySamplingClient{ + persistence: persistence, + rateLimitersForOpen: quotas.NewNamespaceMultiStageRateLimiter( + func(namespace string) quotas.RateLimiter { + return quotas.NewDefaultOutgoingDynamicRateLimiter( + func() float64 { return float64(config.VisibilityOpenMaxQPS(namespace)) }, + ) + }, + persistenceRateLimiter, + ), + rateLimitersForClosed: quotas.NewNamespaceMultiStageRateLimiter( + func(namespace string) quotas.RateLimiter { + return quotas.NewDefaultOutgoingDynamicRateLimiter( + func() float64 { return float64(config.VisibilityClosedMaxQPS(namespace)) }, + ) + }, + persistenceRateLimiter, + ), + rateLimitersForList: quotas.NewNamespaceMultiStageRateLimiter( + func(namespace string) quotas.RateLimiter { + return quotas.NewDefaultOutgoingDynamicRateLimiter( + func() float64 { return float64(config.VisibilityListMaxQPS(namespace)) }, + ) + }, + persistenceRateLimiter, + ), + config: config, + metricClient: metricClient, + logger: logger, } - rateLimiter = tokenbucket.NewFullPriorityTokenBucket(numOfPriority, qps, clock.NewRealTimeSource()) - m.mappings[namespace] = rateLimiter - m.Unlock() - return rateLimiter } func (p *visibilitySamplingClient) RecordWorkflowExecutionStarted(request *RecordWorkflowExecutionStartedRequest) error { namespace := request.Namespace namespaceID := request.NamespaceID - rateLimiter := p.rateLimitersForOpen.getRateLimiter(namespace, numOfPriorityForOpen, p.config.VisibilityOpenMaxQPS(namespace)) - if ok, _ := rateLimiter.GetToken(0, 1); ok { + if ok := p.rateLimitersForOpen.Allow(namespace); ok { return p.persistence.RecordWorkflowExecutionStarted(request) } @@ -124,8 +114,7 @@ func (p *visibilitySamplingClient) RecordWorkflowExecutionStartedV2(request *Rec namespace := request.Namespace namespaceID := request.NamespaceID - rateLimiter := p.rateLimitersForOpen.getRateLimiter(namespace, numOfPriorityForOpen, p.config.VisibilityOpenMaxQPS(namespace)) - if ok, _ := rateLimiter.GetToken(0, 1); ok { + if ok := p.rateLimitersForOpen.Allow(namespace); ok { return p.persistence.RecordWorkflowExecutionStartedV2(request) } @@ -143,10 +132,8 @@ func (p *visibilitySamplingClient) RecordWorkflowExecutionStartedV2(request *Rec func (p *visibilitySamplingClient) RecordWorkflowExecutionClosed(request *RecordWorkflowExecutionClosedRequest) error { namespace := request.Namespace namespaceID := request.NamespaceID - priority := getRequestPriority(request) - rateLimiter := p.rateLimitersForClosed.getRateLimiter(namespace, numOfPriorityForClosed, p.config.VisibilityClosedMaxQPS(namespace)) - if ok, _ := rateLimiter.GetToken(priority, 1); ok { + if ok := p.rateLimitersForClosed.Allow(namespace); ok { return p.persistence.RecordWorkflowExecutionClosed(request) } @@ -164,10 +151,8 @@ func (p *visibilitySamplingClient) RecordWorkflowExecutionClosed(request *Record func (p *visibilitySamplingClient) RecordWorkflowExecutionClosedV2(request *RecordWorkflowExecutionClosedRequest) error { namespace := request.Namespace namespaceID := request.NamespaceID - priority := getRequestPriority(request) - rateLimiter := p.rateLimitersForClosed.getRateLimiter(namespace, numOfPriorityForClosed, p.config.VisibilityClosedMaxQPS(namespace)) - if ok, _ := rateLimiter.GetToken(priority, 1); ok { + if ok := p.rateLimitersForClosed.Allow(namespace); ok { return p.persistence.RecordWorkflowExecutionClosedV2(request) } @@ -186,8 +171,7 @@ func (p *visibilitySamplingClient) UpsertWorkflowExecution(request *UpsertWorkfl namespace := request.Namespace namespaceID := request.NamespaceID - rateLimiter := p.rateLimitersForClosed.getRateLimiter(namespace, numOfPriorityForClosed, p.config.VisibilityClosedMaxQPS(namespace)) - if ok, _ := rateLimiter.GetToken(0, 1); ok { + if ok := p.rateLimitersForClosed.Allow(namespace); ok { return p.persistence.UpsertWorkflowExecution(request) } @@ -206,8 +190,7 @@ func (p *visibilitySamplingClient) UpsertWorkflowExecutionV2(request *UpsertWork namespace := request.Namespace namespaceID := request.NamespaceID - rateLimiter := p.rateLimitersForClosed.getRateLimiter(namespace, numOfPriorityForClosed, p.config.VisibilityClosedMaxQPS(namespace)) - if ok, _ := rateLimiter.GetToken(0, 1); ok { + if ok := p.rateLimitersForClosed.Allow(namespace); ok { return p.persistence.UpsertWorkflowExecutionV2(request) } @@ -225,8 +208,7 @@ func (p *visibilitySamplingClient) UpsertWorkflowExecutionV2(request *UpsertWork func (p *visibilitySamplingClient) ListOpenWorkflowExecutions(request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) { namespace := request.Namespace - rateLimiter := p.rateLimitersForList.getRateLimiter(namespace, numOfPriorityForList, p.config.VisibilityListMaxQPS(namespace)) - if ok, _ := rateLimiter.GetToken(0, 1); !ok { + if ok := p.rateLimitersForList.Allow(namespace); !ok { return nil, ErrPersistenceLimitExceededForList } @@ -236,8 +218,7 @@ func (p *visibilitySamplingClient) ListOpenWorkflowExecutions(request *ListWorkf func (p *visibilitySamplingClient) ListClosedWorkflowExecutions(request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) { namespace := request.Namespace - rateLimiter := p.rateLimitersForList.getRateLimiter(namespace, numOfPriorityForList, p.config.VisibilityListMaxQPS(namespace)) - if ok, _ := rateLimiter.GetToken(0, 1); !ok { + if ok := p.rateLimitersForList.Allow(namespace); !ok { return nil, ErrPersistenceLimitExceededForList } @@ -247,8 +228,7 @@ func (p *visibilitySamplingClient) ListClosedWorkflowExecutions(request *ListWor func (p *visibilitySamplingClient) ListOpenWorkflowExecutionsByType(request *ListWorkflowExecutionsByTypeRequest) (*ListWorkflowExecutionsResponse, error) { namespace := request.Namespace - rateLimiter := p.rateLimitersForList.getRateLimiter(namespace, numOfPriorityForList, p.config.VisibilityListMaxQPS(namespace)) - if ok, _ := rateLimiter.GetToken(0, 1); !ok { + if ok := p.rateLimitersForList.Allow(namespace); !ok { return nil, ErrPersistenceLimitExceededForList } @@ -258,8 +238,7 @@ func (p *visibilitySamplingClient) ListOpenWorkflowExecutionsByType(request *Lis func (p *visibilitySamplingClient) ListClosedWorkflowExecutionsByType(request *ListWorkflowExecutionsByTypeRequest) (*ListWorkflowExecutionsResponse, error) { namespace := request.Namespace - rateLimiter := p.rateLimitersForList.getRateLimiter(namespace, numOfPriorityForList, p.config.VisibilityListMaxQPS(namespace)) - if ok, _ := rateLimiter.GetToken(0, 1); !ok { + if ok := p.rateLimitersForList.Allow(namespace); !ok { return nil, ErrPersistenceLimitExceededForList } @@ -269,8 +248,7 @@ func (p *visibilitySamplingClient) ListClosedWorkflowExecutionsByType(request *L func (p *visibilitySamplingClient) ListOpenWorkflowExecutionsByWorkflowID(request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error) { namespace := request.Namespace - rateLimiter := p.rateLimitersForList.getRateLimiter(namespace, numOfPriorityForList, p.config.VisibilityListMaxQPS(namespace)) - if ok, _ := rateLimiter.GetToken(0, 1); !ok { + if ok := p.rateLimitersForList.Allow(namespace); !ok { return nil, ErrPersistenceLimitExceededForList } @@ -280,8 +258,7 @@ func (p *visibilitySamplingClient) ListOpenWorkflowExecutionsByWorkflowID(reques func (p *visibilitySamplingClient) ListClosedWorkflowExecutionsByWorkflowID(request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error) { namespace := request.Namespace - rateLimiter := p.rateLimitersForList.getRateLimiter(namespace, numOfPriorityForList, p.config.VisibilityListMaxQPS(namespace)) - if ok, _ := rateLimiter.GetToken(0, 1); !ok { + if ok := p.rateLimitersForList.Allow(namespace); !ok { return nil, ErrPersistenceLimitExceededForList } @@ -291,8 +268,7 @@ func (p *visibilitySamplingClient) ListClosedWorkflowExecutionsByWorkflowID(requ func (p *visibilitySamplingClient) ListClosedWorkflowExecutionsByStatus(request *ListClosedWorkflowExecutionsByStatusRequest) (*ListWorkflowExecutionsResponse, error) { namespace := request.Namespace - rateLimiter := p.rateLimitersForList.getRateLimiter(namespace, numOfPriorityForList, p.config.VisibilityListMaxQPS(namespace)) - if ok, _ := rateLimiter.GetToken(0, 1); !ok { + if ok := p.rateLimitersForList.Allow(namespace); !ok { return nil, ErrPersistenceLimitExceededForList } @@ -330,11 +306,3 @@ func (p *visibilitySamplingClient) Close() { func (p *visibilitySamplingClient) GetName() string { return p.persistence.GetName() } - -func getRequestPriority(request *RecordWorkflowExecutionClosedRequest) int { - priority := 0 - if request.Status == enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED { - priority = 1 // low priority for completed workflows - } - return priority -} diff --git a/common/quotas/multi_stage_rate_limiter_impl.go b/common/quotas/multi_stage_rate_limiter_impl.go index ab45b70844d..7a72252dd8e 100644 --- a/common/quotas/multi_stage_rate_limiter_impl.go +++ b/common/quotas/multi_stage_rate_limiter_impl.go @@ -84,7 +84,11 @@ func (rl *MultiStageRateLimiterImpl) AllowN(now time.Time, numToken int) bool { for _, rateLimiter := range rl.rateLimiters { reservation := rateLimiter.ReserveN(now, numToken) - if !reservation.OK() { + if !reservation.OK() || reservation.DelayFrom(now) > 0 { + if reservation.OK() { + reservation.CancelAt(now) + } + // cancel all existing reservation for _, reservation := range reservations { reservation.CancelAt(now) diff --git a/common/quotas/multi_stage_rate_limiter_impl_test.go b/common/quotas/multi_stage_rate_limiter_impl_test.go index 640ed62bfd2..a6a6516a88f 100644 --- a/common/quotas/multi_stage_rate_limiter_impl_test.go +++ b/common/quotas/multi_stage_rate_limiter_impl_test.go @@ -123,15 +123,35 @@ func (s *multiStageRateLimiterSuite) TestAllowN_NonSuccess() { s.False(result) } -func (s *multiStageRateLimiterSuite) TestAllowN_SomeSuccess() { +func (s *multiStageRateLimiterSuite) TestAllowN_SomeSuccess_Case1() { now := time.Now() numToken := 2 s.firstReservation.EXPECT().OK().Return(true).AnyTimes() + s.firstReservation.EXPECT().DelayFrom(now).Return(time.Duration(0)).AnyTimes() s.firstReservation.EXPECT().CancelAt(now).Times(1) s.firstRateLimiter.EXPECT().ReserveN(now, numToken).Return(s.firstReservation).Times(1) s.secondReservation.EXPECT().OK().Return(false).AnyTimes() + s.secondReservation.EXPECT().DelayFrom(now).Return(time.Duration(0)).AnyTimes() + s.secondRateLimiter.EXPECT().ReserveN(now, numToken).Return(s.secondReservation).Times(1) + + result := s.rateLimiter.AllowN(now, numToken) + s.False(result) +} + +func (s *multiStageRateLimiterSuite) TestAllowN_SomeSuccess_Case2() { + now := time.Now() + numToken := 2 + + s.firstReservation.EXPECT().OK().Return(true).AnyTimes() + s.firstReservation.EXPECT().DelayFrom(now).Return(time.Duration(0)).AnyTimes() + s.firstReservation.EXPECT().CancelAt(now).Times(1) + s.firstRateLimiter.EXPECT().ReserveN(now, numToken).Return(s.firstReservation).Times(1) + + s.secondReservation.EXPECT().OK().Return(true).AnyTimes() + s.secondReservation.EXPECT().DelayFrom(now).Return(time.Duration(1)).AnyTimes() + s.secondReservation.EXPECT().CancelAt(now).Times(1) s.secondRateLimiter.EXPECT().ReserveN(now, numToken).Return(s.secondReservation).Times(1) result := s.rateLimiter.AllowN(now, numToken) @@ -143,9 +163,11 @@ func (s *multiStageRateLimiterSuite) TestAllowN_AllSuccess() { numToken := 2 s.firstReservation.EXPECT().OK().Return(true).AnyTimes() + s.firstReservation.EXPECT().DelayFrom(now).Return(time.Duration(0)).AnyTimes() s.firstRateLimiter.EXPECT().ReserveN(now, numToken).Return(s.firstReservation).Times(1) s.secondReservation.EXPECT().OK().Return(true).AnyTimes() + s.secondReservation.EXPECT().DelayFrom(now).Return(time.Duration(0)).AnyTimes() s.secondRateLimiter.EXPECT().ReserveN(now, numToken).Return(s.secondReservation).Times(1) result := s.rateLimiter.AllowN(now, numToken) diff --git a/common/tokenbucket/tb.go b/common/tokenbucket/tb.go deleted file mode 100644 index d68d7ef0a7a..00000000000 --- a/common/tokenbucket/tb.go +++ /dev/null @@ -1,347 +0,0 @@ -// The MIT License -// -// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package tokenbucket - -import ( - "sync" - "sync/atomic" - "time" - - "go.temporal.io/server/common/clock" - "go.temporal.io/server/common/service/dynamicconfig" -) - -type ( - // TokenBucket is the interface for any implementation of a token bucket rate limiter - TokenBucket interface { - // TryConsume attempts to take count tokens from the - // bucket. Returns true on success, false - // otherwise along with the duration for the next refill - TryConsume(count int) (bool, time.Duration) - // Consume waits up to timeout duration to take count - // tokens from the bucket. Returns true if count - // tokens were acquired before timeout, false - // otherwise - Consume(count int, timeout time.Duration) bool - } - - // PriorityTokenBucket is the interface for rate limiter with priority - PriorityTokenBucket interface { - // GetToken attempts to take count tokens from the - // bucket with that priority. Priority 0 is highest. - // Returns true on success, false - // otherwise along with the duration for the next refill - GetToken(priority, count int) (bool, time.Duration) - } - - tokenBucketImpl struct { - sync.Mutex - tokens int - fillRate int // amount of tokens to add every interval - fillInterval time.Duration // time between refills - // Because we divide the per-second quota equally - // every 100 millis, there could be a remainder when - // the desired rate is not a multiple 10 (1second/100Millis) - // To overcome this, we keep track of left over remainder - // and distribute this evenly during every fillInterval - overflowRps int - overflowTokens int - nextRefillTime time.Time - nextOverflowRefillTime time.Time - timeSource clock.TimeSource - } - - dynamicTokenBucketImpl struct { - tb *tokenBucketImpl - currentRPS int32 - rps dynamicconfig.IntPropertyFn - } - - priorityTokenBucketImpl struct { - sync.Mutex - tokens []int - fillRate int - fillInterval time.Duration - nextRefillTime time.Time - // Because we divide the per-second quota equally - // every 100 millis, there could be a remainder when - // the desired rate is not a multiple 10 (1second/100Millis) - // To overcome this, we keep track of left over remainder - // and distribute this evenly during every fillInterval - overflowRps int - overflowTokens int - nextOverflowRefillTime time.Time - timeSource clock.TimeSource - } -) - -const ( - millisPerSecond = 1000 - backoffInterval = 10 * time.Millisecond -) - -// New creates and returns a -// new token bucket rate limiter that -// replenishes the bucket every 100 -// milliseconds. Thread safe. -// -// @param rps -// Desired rate per second -// -// Golang.org has an alternative implementation -// of the rate limiter. On benchmarking, golang's -// implementation was order of magnitude slower. -// In addition, it does a lot more than what we -// need. These are the benchmarks under different -// scenarios -// -// BenchmarkTokenBucketParallel 50000000 40.7 ns/op -// BenchmarkGolangRateParallel 10000000 150 ns/op -// BenchmarkTokenBucketParallel-8 20000000 124 ns/op -// BenchmarkGolangRateParallel-8 10000000 208 ns/op -// BenchmarkTokenBucketParallel 50000000 37.8 ns/op -// BenchmarkGolangRateParallel 10000000 153 ns/op -// BenchmarkTokenBucketParallel-8 10000000 129 ns/op -// BenchmarkGolangRateParallel-8 10000000 208 ns/op -// -func New(rps int, timeSource clock.TimeSource) TokenBucket { - return newTokenBucket(rps, timeSource) -} - -func newTokenBucket(rps int, timeSource clock.TimeSource) *tokenBucketImpl { - tb := new(tokenBucketImpl) - tb.timeSource = timeSource - tb.reset(rps) - return tb -} - -func (tb *tokenBucketImpl) TryConsume(count int) (bool, time.Duration) { - now := tb.timeSource.Now() - tb.Lock() - tb.refill(now) - nextRefillTime := tb.nextRefillTime.Sub(now) - if tb.tokens < count { - tb.Unlock() - return false, nextRefillTime - } - tb.tokens -= count - tb.Unlock() - return true, nextRefillTime -} - -func (tb *tokenBucketImpl) Consume(count int, timeout time.Duration) bool { - - var remTime = timeout - var expiryTime = time.Now().Add(timeout) - - for { - - if ok, _ := tb.TryConsume(count); ok { - return true - } - - if remTime < backoffInterval { - time.Sleep(remTime) - } else { - time.Sleep(backoffInterval) - } - - now := time.Now() - if !now.Before(expiryTime) { - return false - } - - remTime = expiryTime.Sub(now) - } -} - -func (tb *tokenBucketImpl) reset(rps int) { - tb.Lock() - tb.fillInterval = 100 * time.Millisecond - tb.fillRate = (rps * 100) / millisPerSecond - tb.overflowRps = rps - (10 * tb.fillRate) - tb.nextOverflowRefillTime = time.Time{} - tb.Unlock() -} - -func (tb *tokenBucketImpl) refill(now time.Time) { - tb.refillOverFlow(now) - if tb.isRefillDue(now) { - tb.tokens = tb.fillRate - if tb.overflowTokens > 0 { - tb.tokens++ - tb.overflowTokens-- - } - tb.nextRefillTime = now.Add(tb.fillInterval) - } -} - -func (tb *tokenBucketImpl) refillOverFlow(now time.Time) { - if tb.overflowRps < 1 { - return - } - if tb.isOverflowRefillDue(now) { - tb.overflowTokens = tb.overflowRps - tb.nextOverflowRefillTime = now.Add(time.Second) - } -} - -func (tb *tokenBucketImpl) isRefillDue(now time.Time) bool { - return !now.Before(tb.nextRefillTime) -} - -func (tb *tokenBucketImpl) isOverflowRefillDue(now time.Time) bool { - return !now.Before(tb.nextOverflowRefillTime) -} - -// NewDynamicTokenBucket creates and returns a token bucket -// rate limiter that supports dynamic change of RPS. Thread safe. -// @param rps -// Dynamic config function for rate per second -func NewDynamicTokenBucket(rps dynamicconfig.IntPropertyFn, timeSource clock.TimeSource) TokenBucket { - initialRPS := rps() - return &dynamicTokenBucketImpl{ - rps: rps, - currentRPS: int32(initialRPS), - tb: newTokenBucket(initialRPS, timeSource), - } -} - -func (dtb *dynamicTokenBucketImpl) TryConsume(count int) (bool, time.Duration) { - dtb.resetRateIfChanged(dtb.rps()) - return dtb.tb.TryConsume(count) -} - -func (dtb *dynamicTokenBucketImpl) Consume(count int, timeout time.Duration) bool { - dtb.resetRateIfChanged(dtb.rps()) - return dtb.tb.Consume(count, timeout) -} - -// resetLimitIfChanged resets the underlying token bucket if the -// current rps quota is different from the actual rps quota obtained -// from dynamic config -func (dtb *dynamicTokenBucketImpl) resetRateIfChanged(newRPS int) { - currentRPS := atomic.LoadInt32(&dtb.currentRPS) - if int(currentRPS) == newRPS { - return - } - if atomic.CompareAndSwapInt32(&dtb.currentRPS, currentRPS, int32(newRPS)) { - dtb.tb.reset(newRPS) - } -} - -// NewPriorityTokenBucket creates and returns a -// new token bucket rate limiter support priority. -// There are n buckets for n priorities. It -// replenishes the top priority bucket every 100 -// milliseconds, unused tokens flows to next bucket. -// The idea comes from Dual Token Bucket Algorithms. -// Thread safe. -// -// @param numOfPriority -// Number of priorities -// @param rps -// Desired rate per second -// -func NewPriorityTokenBucket(numOfPriority, rps int, timeSource clock.TimeSource) PriorityTokenBucket { - tb := new(priorityTokenBucketImpl) - tb.tokens = make([]int, numOfPriority) - tb.timeSource = timeSource - tb.fillInterval = time.Millisecond * 100 - tb.fillRate = (rps * 100) / millisPerSecond - tb.overflowRps = rps - (10 * tb.fillRate) - tb.refill(time.Now()) - return tb -} - -// NewFullPriorityTokenBucket creates and returns a new priority token bucket with all bucket init with full tokens. -// With all buckets full, get tokens from low priority buckets won't be missed initially, but may caused bursts. -func NewFullPriorityTokenBucket(numOfPriority, rps int, timeSource clock.TimeSource) PriorityTokenBucket { - tb := new(priorityTokenBucketImpl) - tb.tokens = make([]int, numOfPriority) - tb.timeSource = timeSource - tb.fillInterval = time.Millisecond * 100 - tb.fillRate = (rps * 100) / millisPerSecond - tb.overflowRps = rps - (10 * tb.fillRate) - tb.refill(time.Now()) - for i := 1; i < numOfPriority; i++ { - tb.nextRefillTime = time.Time{} - tb.refill(time.Now()) - } - return tb -} - -func (tb *priorityTokenBucketImpl) GetToken(priority, count int) (bool, time.Duration) { - now := tb.timeSource.Now() - tb.Lock() - tb.refill(now) - nextRefillTime := tb.nextRefillTime.Sub(now) - if tb.tokens[priority] < count { - tb.Unlock() - return false, nextRefillTime - } - tb.tokens[priority] -= count - tb.Unlock() - return true, nextRefillTime -} - -func (tb *priorityTokenBucketImpl) refill(now time.Time) { - tb.refillOverFlow(now) - if tb.isRefillDue(now) { - more := tb.fillRate - for i := 0; i < len(tb.tokens); i++ { - tb.tokens[i] += more - if tb.tokens[i] > tb.fillRate { - more = tb.tokens[i] - tb.fillRate - tb.tokens[i] = tb.fillRate - } else { - break - } - } - if tb.overflowTokens > 0 { - tb.tokens[0]++ - tb.overflowTokens-- - } - tb.nextRefillTime = now.Add(tb.fillInterval) - } -} - -func (tb *priorityTokenBucketImpl) refillOverFlow(now time.Time) { - if tb.overflowRps < 1 { - return - } - if tb.isOverflowRefillDue(now) { - tb.overflowTokens = tb.overflowRps - tb.nextOverflowRefillTime = now.Add(time.Second) - } -} - -func (tb *priorityTokenBucketImpl) isRefillDue(now time.Time) bool { - return !now.Before(tb.nextRefillTime) -} - -func (tb *priorityTokenBucketImpl) isOverflowRefillDue(now time.Time) bool { - return !now.Before(tb.nextOverflowRefillTime) -} diff --git a/common/tokenbucket/tb_test.go b/common/tokenbucket/tb_test.go deleted file mode 100644 index 8cb339f02a7..00000000000 --- a/common/tokenbucket/tb_test.go +++ /dev/null @@ -1,232 +0,0 @@ -// The MIT License -// -// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package tokenbucket - -import ( - "testing" - "time" - - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - - "go.temporal.io/server/common/service/dynamicconfig" -) - -type ( - TokenBucketSuite struct { - *require.Assertions // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, not merely log an error - suite.Suite - } -) - -func TestTokenBucketSuite(t *testing.T) { - suite.Run(t, new(TokenBucketSuite)) -} - -func (s *TokenBucketSuite) SetupTest() { - s.Assertions = require.New(s.T()) // Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil -} - -type mockTimeSource struct { - currTime time.Time -} - -func (ts *mockTimeSource) Now() time.Time { - return ts.currTime -} - -func (ts *mockTimeSource) advance(d time.Duration) { - ts.currTime = ts.currTime.Add(d) -} - -func (s *TokenBucketSuite) TestRpsEnforced() { - ts := &mockTimeSource{currTime: time.Now().UTC()} - tb := New(99, ts) - for i := 0; i < 2; i++ { - total, attempts := s.testRpsEnforcedHelper(tb, ts, 11, 90, 10) - s.Equal(90, total, "Token bucket failed to enforce limit") - s.Equal(9, attempts, "Token bucket gave out tokens too quickly") - ts.advance(time.Millisecond * 101) - ok, _ := tb.TryConsume(9) - s.True(ok, "Token bucket failed to enforce limit") - ok, _ = tb.TryConsume(1) - s.False(ok, "Token bucket failed to enforce limit") - ts.advance(time.Second) - } -} - -func (s *TokenBucketSuite) TestLowRpsEnforced() { - ts := &mockTimeSource{currTime: time.Now().UTC()} - tb := New(3, ts) - - total, attempts := s.testRpsEnforcedHelper(tb, ts, 10, 3, 1) - s.Equal(3, total, "Token bucket failed to enforce limit") - s.Equal(3, attempts, "Token bucket gave out tokens too quickly") -} - -func (s *TokenBucketSuite) TestDynamicRpsEnforced() { - rpsConfigFn, rpsPtr := s.getTestRPSConfigFn(99) - ts := &mockTimeSource{currTime: time.Now().UTC()} - dtb := NewDynamicTokenBucket(rpsConfigFn, ts) - total, attempts := s.testRpsEnforcedHelper(dtb, ts, 11, 90, 10) - s.Equal(90, total, "Token bucket failed to enforce limit") - s.Equal(9, attempts, "Token bucket gave out tokens too quickly") - ts.advance(time.Second) - - *rpsPtr = 3 - total, attempts = s.testRpsEnforcedHelper(dtb, ts, 10, 3, 1) - s.Equal(3, total, "Token bucket failed to enforce limit") - s.Equal(3, attempts, "Token bucket gave out tokens too quickly") -} - -func (s *TokenBucketSuite) testRpsEnforcedHelper(tb TokenBucket, ts *mockTimeSource, maxAttempts, tokenNeeded, consumeRate int) (total, attempts int) { - total = 0 - attempts = 1 - for ; attempts < maxAttempts+1; attempts++ { - for c := 0; c < 2; c++ { - if ok, _ := tb.TryConsume(consumeRate); ok { - total += consumeRate - } - } - if total >= tokenNeeded { - break - } - ts.advance(time.Millisecond * 101) - } - return -} - -func (s *TokenBucketSuite) getTestRPSConfigFn(defaultValue int) (dynamicconfig.IntPropertyFn, *int) { - rps := defaultValue - return func(_ ...dynamicconfig.FilterOption) int { - return rps - }, &rps -} - -func (s *TokenBucketSuite) TestPriorityRpsEnforced() { - ts := &mockTimeSource{currTime: time.Now().UTC()} - tb := NewPriorityTokenBucket(1, 99, ts) // behavior same to tokenBucketImpl - - for i := 0; i < 2; i++ { - total := 0 - attempts := 1 - for ; attempts < 11; attempts++ { - for c := 0; c < 2; c++ { - if ok, _ := tb.GetToken(0, 10); ok { - total += 10 - } - } - - if total >= 90 { - break - } - ts.advance(time.Millisecond * 101) - } - s.Equal(90, total, "Token bucket failed to enforce limit") - s.Equal(9, attempts, "Token bucket gave out tokens too quickly") - - ts.advance(time.Millisecond * 101) - ok, _ := tb.GetToken(0, 9) - s.True(ok, "Token bucket failed to enforce limit") - ok, _ = tb.GetToken(0, 1) - s.False(ok, "Token bucket failed to enforce limit") - ts.advance(time.Second) - } -} - -func (s *TokenBucketSuite) TestPriorityLowRpsEnforced() { - ts := &mockTimeSource{currTime: time.Now().UTC()} - tb := NewPriorityTokenBucket(1, 3, ts) // behavior same to tokenBucketImpl - - total := 0 - attempts := 1 - for ; attempts < 10; attempts++ { - for c := 0; c < 2; c++ { - if ok, _ := tb.GetToken(0, 1); ok { - total++ - } - } - if total >= 3 { - break - } - ts.advance(time.Millisecond * 101) - } - s.Equal(3, total, "Token bucket failed to enforce limit") - s.Equal(3, attempts, "Token bucket gave out tokens too quickly") -} - -func (s *TokenBucketSuite) TestPriorityTokenBucket() { - ts := &mockTimeSource{currTime: time.Now().UTC()} - tb := NewPriorityTokenBucket(2, 100, ts) - - for i := 0; i < 2; i++ { - ok2, _ := tb.GetToken(1, 1) - s.False(ok2) - ok, _ := tb.GetToken(0, 10) - s.True(ok) - ts.advance(time.Millisecond * 101) - } - - for i := 0; i < 2; i++ { - ok, _ := tb.GetToken(0, 9) - s.True(ok) // 1 token remaining in 1st bucket, 0 in 2nd - ok2, _ := tb.GetToken(1, 1) - s.False(ok2) - ts.advance(time.Millisecond * 101) - ok2, _ = tb.GetToken(1, 2) - s.False(ok2) - ok2, _ = tb.GetToken(1, 1) - s.True(ok2) - } -} - -func (s *TokenBucketSuite) TestFullPriorityTokenBucket() { - ts := &mockTimeSource{currTime: time.Now().UTC()} - tb := NewFullPriorityTokenBucket(2, 100, ts) - - ok2, _ := tb.GetToken(1, 10) - s.True(ok2) - - for i := 0; i < 2; i++ { - ok2, _ := tb.GetToken(1, 1) - s.False(ok2) - ok, _ := tb.GetToken(0, 10) - s.True(ok) - ts.advance(time.Millisecond * 101) - } - - ok2, _ = tb.GetToken(1, 1) - s.False(ok2) - ts.advance(time.Millisecond * 101) - ok2, _ = tb.GetToken(1, 5) - s.True(ok2) - ts.advance(time.Millisecond * 101) - ok2, _ = tb.GetToken(1, 15) - s.False(ok2) - ok2, _ = tb.GetToken(1, 10) - s.True(ok2) - ok, _ := tb.GetToken(0, 10) - s.True(ok) -} diff --git a/tools/cli/adminElasticSearchCommands.go b/tools/cli/adminElasticSearchCommands.go index e704903ff97..82c3c44e9a1 100644 --- a/tools/cli/adminElasticSearchCommands.go +++ b/tools/cli/adminElasticSearchCommands.go @@ -40,11 +40,10 @@ import ( enumsspb "go.temporal.io/server/api/enums/v1" indexerspb "go.temporal.io/server/api/indexer/v1" - "go.temporal.io/server/common/clock" "go.temporal.io/server/common/codec" es "go.temporal.io/server/common/elasticsearch" "go.temporal.io/server/common/elasticsearch/esql" - "go.temporal.io/server/common/tokenbucket" + "go.temporal.io/server/common/quotas" ) const ( @@ -186,7 +185,9 @@ func AdminDelete(c *cli.Context) { inputFileName := getRequiredOption(c, FlagInputFile) batchSize := c.Int(FlagBatchSize) rps := c.Int(FlagRPS) - ratelimiter := tokenbucket.New(rps, clock.NewRealTimeSource()) + ratelimiter := quotas.NewDefaultOutgoingDynamicRateLimiter( + func() float64 { return float64(rps) }, + ) // This is only executed from the CLI by an admin user // #nosec @@ -202,10 +203,7 @@ func AdminDelete(c *cli.Context) { bulkRequest := esClient.Bulk() bulkConductFn := func() { - ok, waitTime := ratelimiter.TryConsume(1) - if !ok { - time.Sleep(waitTime) - } + _ = ratelimiter.Wait(context.Background()) err := bulkRequest.Do(context.Background()) if err != nil { ErrorAndExit(fmt.Sprintf("Bulk failed, current processed row %d", i), err)