Skip to content

Commit

Permalink
Deprecate token bucket (#1253)
Browse files Browse the repository at this point in the history
* Update visibilitySamplingClient using quotas rate limiter
* Fix multi stage rate limiter allow function
  • Loading branch information
wxing1292 authored Feb 3, 2021
1 parent f3e33a9 commit 10884f0
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 679 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *VisibilitySamplingSuite) SetupTest() {
s.persistence = &mocks.VisibilityManager{}
config := &c.VisibilityConfig{
VisibilityOpenMaxQPS: dynamicconfig.GetIntPropertyFilteredByNamespace(1),
VisibilityClosedMaxQPS: dynamicconfig.GetIntPropertyFilteredByNamespace(10),
VisibilityClosedMaxQPS: dynamicconfig.GetIntPropertyFilteredByNamespace(2),
VisibilityListMaxQPS: dynamicconfig.GetIntPropertyFilteredByNamespace(1),
}
s.metricClient = &mmocks.Client{}
Expand Down
148 changes: 58 additions & 90 deletions common/persistence/visibilitySamplingClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,17 @@
package persistence

import (
"sync"

enumspb "go.temporal.io/api/enums/v1"

"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/service/config"
"go.temporal.io/server/common/tokenbucket"
)

const (
// To sample visibility request, open has only 1 bucket, closed has 2
numOfPriorityForOpen = 1
numOfPriorityForClosed = 2
numOfPriorityForList = 1
)

type visibilitySamplingClient struct {
rateLimitersForOpen *namespaceToBucketMap
rateLimitersForClosed *namespaceToBucketMap
rateLimitersForList *namespaceToBucketMap
rateLimitersForOpen quotas.NamespaceRateLimiter
rateLimitersForClosed quotas.NamespaceRateLimiter
rateLimitersForList quotas.NamespaceRateLimiter
persistence VisibilityManager
config *config.VisibilityConfig
metricClient metrics.Client
Expand All @@ -57,55 +45,57 @@ type visibilitySamplingClient struct {
var _ VisibilityManager = (*visibilitySamplingClient)(nil)

// NewVisibilitySamplingClient creates a client to manage visibility with sampling
func NewVisibilitySamplingClient(persistence VisibilityManager, config *config.VisibilityConfig, metricClient metrics.Client, logger log.Logger) VisibilityManager {
return &visibilitySamplingClient{
persistence: persistence,
rateLimitersForOpen: newNamespaceToBucketMap(),
rateLimitersForClosed: newNamespaceToBucketMap(),
rateLimitersForList: newNamespaceToBucketMap(),
config: config,
metricClient: metricClient,
logger: logger,
}
}

type namespaceToBucketMap struct {
sync.RWMutex
mappings map[string]tokenbucket.PriorityTokenBucket
}

func newNamespaceToBucketMap() *namespaceToBucketMap {
return &namespaceToBucketMap{
mappings: make(map[string]tokenbucket.PriorityTokenBucket),
}
}

func (m *namespaceToBucketMap) getRateLimiter(namespace string, numOfPriority, qps int) tokenbucket.PriorityTokenBucket {
m.RLock()
rateLimiter, exist := m.mappings[namespace]
m.RUnlock()

if exist {
return rateLimiter
func NewVisibilitySamplingClient(
persistence VisibilityManager,
config *config.VisibilityConfig,
metricClient metrics.Client,
logger log.Logger,
) VisibilityManager {

var persistenceRateLimiter []quotas.RateLimiter
if config.MaxQPS != nil {
persistenceRateLimiter = append(persistenceRateLimiter, quotas.NewDefaultIncomingDynamicRateLimiter(
func() float64 { return float64(config.MaxQPS()) },
))
}

m.Lock()
if rateLimiter, ok := m.mappings[namespace]; ok { // read again to ensure no duplicate create
m.Unlock()
return rateLimiter
return &visibilitySamplingClient{
persistence: persistence,
rateLimitersForOpen: quotas.NewNamespaceMultiStageRateLimiter(
func(namespace string) quotas.RateLimiter {
return quotas.NewDefaultOutgoingDynamicRateLimiter(
func() float64 { return float64(config.VisibilityOpenMaxQPS(namespace)) },
)
},
persistenceRateLimiter,
),
rateLimitersForClosed: quotas.NewNamespaceMultiStageRateLimiter(
func(namespace string) quotas.RateLimiter {
return quotas.NewDefaultOutgoingDynamicRateLimiter(
func() float64 { return float64(config.VisibilityClosedMaxQPS(namespace)) },
)
},
persistenceRateLimiter,
),
rateLimitersForList: quotas.NewNamespaceMultiStageRateLimiter(
func(namespace string) quotas.RateLimiter {
return quotas.NewDefaultOutgoingDynamicRateLimiter(
func() float64 { return float64(config.VisibilityListMaxQPS(namespace)) },
)
},
persistenceRateLimiter,
),
config: config,
metricClient: metricClient,
logger: logger,
}
rateLimiter = tokenbucket.NewFullPriorityTokenBucket(numOfPriority, qps, clock.NewRealTimeSource())
m.mappings[namespace] = rateLimiter
m.Unlock()
return rateLimiter
}

func (p *visibilitySamplingClient) RecordWorkflowExecutionStarted(request *RecordWorkflowExecutionStartedRequest) error {
namespace := request.Namespace
namespaceID := request.NamespaceID

rateLimiter := p.rateLimitersForOpen.getRateLimiter(namespace, numOfPriorityForOpen, p.config.VisibilityOpenMaxQPS(namespace))
if ok, _ := rateLimiter.GetToken(0, 1); ok {
if ok := p.rateLimitersForOpen.Allow(namespace); ok {
return p.persistence.RecordWorkflowExecutionStarted(request)
}

Expand All @@ -124,8 +114,7 @@ func (p *visibilitySamplingClient) RecordWorkflowExecutionStartedV2(request *Rec
namespace := request.Namespace
namespaceID := request.NamespaceID

rateLimiter := p.rateLimitersForOpen.getRateLimiter(namespace, numOfPriorityForOpen, p.config.VisibilityOpenMaxQPS(namespace))
if ok, _ := rateLimiter.GetToken(0, 1); ok {
if ok := p.rateLimitersForOpen.Allow(namespace); ok {
return p.persistence.RecordWorkflowExecutionStartedV2(request)
}

Expand All @@ -143,10 +132,8 @@ func (p *visibilitySamplingClient) RecordWorkflowExecutionStartedV2(request *Rec
func (p *visibilitySamplingClient) RecordWorkflowExecutionClosed(request *RecordWorkflowExecutionClosedRequest) error {
namespace := request.Namespace
namespaceID := request.NamespaceID
priority := getRequestPriority(request)

rateLimiter := p.rateLimitersForClosed.getRateLimiter(namespace, numOfPriorityForClosed, p.config.VisibilityClosedMaxQPS(namespace))
if ok, _ := rateLimiter.GetToken(priority, 1); ok {
if ok := p.rateLimitersForClosed.Allow(namespace); ok {
return p.persistence.RecordWorkflowExecutionClosed(request)
}

Expand All @@ -164,10 +151,8 @@ func (p *visibilitySamplingClient) RecordWorkflowExecutionClosed(request *Record
func (p *visibilitySamplingClient) RecordWorkflowExecutionClosedV2(request *RecordWorkflowExecutionClosedRequest) error {
namespace := request.Namespace
namespaceID := request.NamespaceID
priority := getRequestPriority(request)

rateLimiter := p.rateLimitersForClosed.getRateLimiter(namespace, numOfPriorityForClosed, p.config.VisibilityClosedMaxQPS(namespace))
if ok, _ := rateLimiter.GetToken(priority, 1); ok {
if ok := p.rateLimitersForClosed.Allow(namespace); ok {
return p.persistence.RecordWorkflowExecutionClosedV2(request)
}

Expand All @@ -186,8 +171,7 @@ func (p *visibilitySamplingClient) UpsertWorkflowExecution(request *UpsertWorkfl
namespace := request.Namespace
namespaceID := request.NamespaceID

rateLimiter := p.rateLimitersForClosed.getRateLimiter(namespace, numOfPriorityForClosed, p.config.VisibilityClosedMaxQPS(namespace))
if ok, _ := rateLimiter.GetToken(0, 1); ok {
if ok := p.rateLimitersForClosed.Allow(namespace); ok {
return p.persistence.UpsertWorkflowExecution(request)
}

Expand All @@ -206,8 +190,7 @@ func (p *visibilitySamplingClient) UpsertWorkflowExecutionV2(request *UpsertWork
namespace := request.Namespace
namespaceID := request.NamespaceID

rateLimiter := p.rateLimitersForClosed.getRateLimiter(namespace, numOfPriorityForClosed, p.config.VisibilityClosedMaxQPS(namespace))
if ok, _ := rateLimiter.GetToken(0, 1); ok {
if ok := p.rateLimitersForClosed.Allow(namespace); ok {
return p.persistence.UpsertWorkflowExecutionV2(request)
}

Expand All @@ -225,8 +208,7 @@ func (p *visibilitySamplingClient) UpsertWorkflowExecutionV2(request *UpsertWork
func (p *visibilitySamplingClient) ListOpenWorkflowExecutions(request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) {
namespace := request.Namespace

rateLimiter := p.rateLimitersForList.getRateLimiter(namespace, numOfPriorityForList, p.config.VisibilityListMaxQPS(namespace))
if ok, _ := rateLimiter.GetToken(0, 1); !ok {
if ok := p.rateLimitersForList.Allow(namespace); !ok {
return nil, ErrPersistenceLimitExceededForList
}

Expand All @@ -236,8 +218,7 @@ func (p *visibilitySamplingClient) ListOpenWorkflowExecutions(request *ListWorkf
func (p *visibilitySamplingClient) ListClosedWorkflowExecutions(request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) {
namespace := request.Namespace

rateLimiter := p.rateLimitersForList.getRateLimiter(namespace, numOfPriorityForList, p.config.VisibilityListMaxQPS(namespace))
if ok, _ := rateLimiter.GetToken(0, 1); !ok {
if ok := p.rateLimitersForList.Allow(namespace); !ok {
return nil, ErrPersistenceLimitExceededForList
}

Expand All @@ -247,8 +228,7 @@ func (p *visibilitySamplingClient) ListClosedWorkflowExecutions(request *ListWor
func (p *visibilitySamplingClient) ListOpenWorkflowExecutionsByType(request *ListWorkflowExecutionsByTypeRequest) (*ListWorkflowExecutionsResponse, error) {
namespace := request.Namespace

rateLimiter := p.rateLimitersForList.getRateLimiter(namespace, numOfPriorityForList, p.config.VisibilityListMaxQPS(namespace))
if ok, _ := rateLimiter.GetToken(0, 1); !ok {
if ok := p.rateLimitersForList.Allow(namespace); !ok {
return nil, ErrPersistenceLimitExceededForList
}

Expand All @@ -258,8 +238,7 @@ func (p *visibilitySamplingClient) ListOpenWorkflowExecutionsByType(request *Lis
func (p *visibilitySamplingClient) ListClosedWorkflowExecutionsByType(request *ListWorkflowExecutionsByTypeRequest) (*ListWorkflowExecutionsResponse, error) {
namespace := request.Namespace

rateLimiter := p.rateLimitersForList.getRateLimiter(namespace, numOfPriorityForList, p.config.VisibilityListMaxQPS(namespace))
if ok, _ := rateLimiter.GetToken(0, 1); !ok {
if ok := p.rateLimitersForList.Allow(namespace); !ok {
return nil, ErrPersistenceLimitExceededForList
}

Expand All @@ -269,8 +248,7 @@ func (p *visibilitySamplingClient) ListClosedWorkflowExecutionsByType(request *L
func (p *visibilitySamplingClient) ListOpenWorkflowExecutionsByWorkflowID(request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error) {
namespace := request.Namespace

rateLimiter := p.rateLimitersForList.getRateLimiter(namespace, numOfPriorityForList, p.config.VisibilityListMaxQPS(namespace))
if ok, _ := rateLimiter.GetToken(0, 1); !ok {
if ok := p.rateLimitersForList.Allow(namespace); !ok {
return nil, ErrPersistenceLimitExceededForList
}

Expand All @@ -280,8 +258,7 @@ func (p *visibilitySamplingClient) ListOpenWorkflowExecutionsByWorkflowID(reques
func (p *visibilitySamplingClient) ListClosedWorkflowExecutionsByWorkflowID(request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error) {
namespace := request.Namespace

rateLimiter := p.rateLimitersForList.getRateLimiter(namespace, numOfPriorityForList, p.config.VisibilityListMaxQPS(namespace))
if ok, _ := rateLimiter.GetToken(0, 1); !ok {
if ok := p.rateLimitersForList.Allow(namespace); !ok {
return nil, ErrPersistenceLimitExceededForList
}

Expand All @@ -291,8 +268,7 @@ func (p *visibilitySamplingClient) ListClosedWorkflowExecutionsByWorkflowID(requ
func (p *visibilitySamplingClient) ListClosedWorkflowExecutionsByStatus(request *ListClosedWorkflowExecutionsByStatusRequest) (*ListWorkflowExecutionsResponse, error) {
namespace := request.Namespace

rateLimiter := p.rateLimitersForList.getRateLimiter(namespace, numOfPriorityForList, p.config.VisibilityListMaxQPS(namespace))
if ok, _ := rateLimiter.GetToken(0, 1); !ok {
if ok := p.rateLimitersForList.Allow(namespace); !ok {
return nil, ErrPersistenceLimitExceededForList
}

Expand Down Expand Up @@ -330,11 +306,3 @@ func (p *visibilitySamplingClient) Close() {
func (p *visibilitySamplingClient) GetName() string {
return p.persistence.GetName()
}

func getRequestPriority(request *RecordWorkflowExecutionClosedRequest) int {
priority := 0
if request.Status == enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED {
priority = 1 // low priority for completed workflows
}
return priority
}
6 changes: 5 additions & 1 deletion common/quotas/multi_stage_rate_limiter_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ func (rl *MultiStageRateLimiterImpl) AllowN(now time.Time, numToken int) bool {

for _, rateLimiter := range rl.rateLimiters {
reservation := rateLimiter.ReserveN(now, numToken)
if !reservation.OK() {
if !reservation.OK() || reservation.DelayFrom(now) > 0 {
if reservation.OK() {
reservation.CancelAt(now)
}

// cancel all existing reservation
for _, reservation := range reservations {
reservation.CancelAt(now)
Expand Down
24 changes: 23 additions & 1 deletion common/quotas/multi_stage_rate_limiter_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,35 @@ func (s *multiStageRateLimiterSuite) TestAllowN_NonSuccess() {
s.False(result)
}

func (s *multiStageRateLimiterSuite) TestAllowN_SomeSuccess() {
func (s *multiStageRateLimiterSuite) TestAllowN_SomeSuccess_Case1() {
now := time.Now()
numToken := 2

s.firstReservation.EXPECT().OK().Return(true).AnyTimes()
s.firstReservation.EXPECT().DelayFrom(now).Return(time.Duration(0)).AnyTimes()
s.firstReservation.EXPECT().CancelAt(now).Times(1)
s.firstRateLimiter.EXPECT().ReserveN(now, numToken).Return(s.firstReservation).Times(1)

s.secondReservation.EXPECT().OK().Return(false).AnyTimes()
s.secondReservation.EXPECT().DelayFrom(now).Return(time.Duration(0)).AnyTimes()
s.secondRateLimiter.EXPECT().ReserveN(now, numToken).Return(s.secondReservation).Times(1)

result := s.rateLimiter.AllowN(now, numToken)
s.False(result)
}

func (s *multiStageRateLimiterSuite) TestAllowN_SomeSuccess_Case2() {
now := time.Now()
numToken := 2

s.firstReservation.EXPECT().OK().Return(true).AnyTimes()
s.firstReservation.EXPECT().DelayFrom(now).Return(time.Duration(0)).AnyTimes()
s.firstReservation.EXPECT().CancelAt(now).Times(1)
s.firstRateLimiter.EXPECT().ReserveN(now, numToken).Return(s.firstReservation).Times(1)

s.secondReservation.EXPECT().OK().Return(true).AnyTimes()
s.secondReservation.EXPECT().DelayFrom(now).Return(time.Duration(1)).AnyTimes()
s.secondReservation.EXPECT().CancelAt(now).Times(1)
s.secondRateLimiter.EXPECT().ReserveN(now, numToken).Return(s.secondReservation).Times(1)

result := s.rateLimiter.AllowN(now, numToken)
Expand All @@ -143,9 +163,11 @@ func (s *multiStageRateLimiterSuite) TestAllowN_AllSuccess() {
numToken := 2

s.firstReservation.EXPECT().OK().Return(true).AnyTimes()
s.firstReservation.EXPECT().DelayFrom(now).Return(time.Duration(0)).AnyTimes()
s.firstRateLimiter.EXPECT().ReserveN(now, numToken).Return(s.firstReservation).Times(1)

s.secondReservation.EXPECT().OK().Return(true).AnyTimes()
s.secondReservation.EXPECT().DelayFrom(now).Return(time.Duration(0)).AnyTimes()
s.secondRateLimiter.EXPECT().ReserveN(now, numToken).Return(s.secondReservation).Times(1)

result := s.rateLimiter.AllowN(now, numToken)
Expand Down
Loading

0 comments on commit 10884f0

Please sign in to comment.