Skip to content

Commit

Permalink
Add server controlled task queue max dispatch rate (#1294)
Browse files Browse the repository at this point in the history
* Add admin dynamic config controlled task queue max rate limit
  • Loading branch information
wxing1292 authored Feb 17, 2021
1 parent d6ed02d commit de1a010
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 18 deletions.
49 changes: 49 additions & 0 deletions common/service/dynamicconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ type FloatPropertyFn func(opts ...FilterOption) float64
// FloatPropertyFnWithShardIDFilter is a wrapper to get float property from dynamic config with shardID as filter
type FloatPropertyFnWithShardIDFilter func(shardID int32) float64

// FloatPropertyFnWithNamespaceFilter is a wrapper to get float property from dynamic config with namespace as filter
type FloatPropertyFnWithNamespaceFilter func(namespace string) float64

// FloatPropertyFnWithTaskQueueInfoFilters is a wrapper to get float property from dynamic config with three filters: namespace, taskQueue, taskType
type FloatPropertyFnWithTaskQueueInfoFilters func(namespace string, taskQueue string, taskType enumspb.TaskQueueType) float64

// DurationPropertyFn is a wrapper to get duration property from dynamic config
type DurationPropertyFn func(opts ...FilterOption) time.Duration

Expand Down Expand Up @@ -260,6 +266,49 @@ func (c *Collection) GetFloat64PropertyFilteredByShardID(key Key, defaultValue f
}
}

// GetFloatPropertyFilteredByNamespace gets property with namespace filter and asserts that it's a float
func (c *Collection) GetFloatPropertyFilteredByNamespace(key Key, defaultValue float64) FloatPropertyFnWithNamespaceFilter {
return func(namespace string) float64 {
val, err := c.client.GetFloatValue(key, getFilterMap(NamespaceFilter(namespace)), defaultValue)
if err != nil {
c.logError(key, err)
}
c.logValue(key, val, defaultValue, float64CompareEquals)
return val
}
}

// GetFloatPropertyFilteredByTaskQueueInfo gets property with taskQueueInfo as filters and asserts that it's an integer
func (c *Collection) GetFloatPropertyFilteredByTaskQueueInfo(key Key, defaultValue float64) FloatPropertyFnWithTaskQueueInfoFilters {
return func(namespace string, taskQueue string, taskType enumspb.TaskQueueType) float64 {
val := defaultValue
var err error

filterMaps := []map[Filter]interface{}{
getFilterMap(NamespaceFilter(namespace), TaskQueueFilter(taskQueue), TaskTypeFilter(taskType)),
getFilterMap(NamespaceFilter(namespace), TaskQueueFilter(taskQueue)),
}

for _, filterMap := range filterMaps {
val, err = c.client.GetFloatValue(
key,
filterMap,
defaultValue,
)
if err != nil {
c.logError(key, err)
}

if val != defaultValue {
break
}
}

c.logValue(key, val, defaultValue, float64CompareEquals)
return val
}
}

// GetDurationProperty gets property and asserts that it's a duration
func (c *Collection) GetDurationProperty(key Key, defaultValue time.Duration) DurationPropertyFn {
return func(opts ...FilterOption) time.Duration {
Expand Down
10 changes: 10 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ var keys = map[Key]string{
testGetBoolPropertyFilteredByNamespaceIDKey: "testGetBoolPropertyFilteredByNamespaceIDKey",
testGetBoolPropertyFilteredByTaskQueueInfoKey: "testGetBoolPropertyFilteredByTaskQueueInfoKey",

// admin settings
// NOTE: admin settings are not guaranteed to be compatible across different versions
AdminMatchingDispatchRate: "admin.matchingDispatchRate",
AdminMatchingTaskqueueDispatchRate: "admin.matchingTaskqueueDispatchRate",

// system settings
EnableVisibilitySampling: "system.enableVisibilitySampling",
AdvancedVisibilityWritingMode: "system.advancedVisibilityWritingMode",
Expand Down Expand Up @@ -325,6 +330,11 @@ const (
testGetBoolPropertyFilteredByNamespaceIDKey
testGetBoolPropertyFilteredByTaskQueueInfoKey

// AdminMatchingDispatchRate is the max qps of any task queue for a given namespace
AdminMatchingDispatchRate
// AdminMatchingTaskqueueDispatchRate is the max qps of a task queue for a given namespace
AdminMatchingTaskqueueDispatchRate

// EnableVisibilitySampling is key for enable visibility sampling
EnableVisibilitySampling
// AdvancedVisibilityWritingMode is key for how to write to advanced visibility
Expand Down
27 changes: 23 additions & 4 deletions service/matching/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type (
MaxTaskBatchSize dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters

ThrottledLogRPS dynamicconfig.IntPropertyFn

AdminMatchingDispatchRate dynamicconfig.FloatPropertyFnWithNamespaceFilter
AdminMatchingTaskqueueDispatchRate dynamicconfig.FloatPropertyFnWithTaskQueueInfoFilters
}

forwarderConfig struct {
Expand All @@ -90,6 +93,9 @@ type (
MaxTaskBatchSize func() int
NumWritePartitions func() int
NumReadPartitions func() int

AdminDispatchRate func() float64
AdminQueuePartitionDispatchRate func() float64
}
)

Expand Down Expand Up @@ -118,6 +124,9 @@ func NewConfig(dc *dynamicconfig.Collection) *Config {
ForwarderMaxRatePerSecond: dc.GetIntPropertyFilteredByTaskQueueInfo(dynamicconfig.MatchingForwarderMaxRatePerSecond, 10),
ForwarderMaxChildrenPerNode: dc.GetIntPropertyFilteredByTaskQueueInfo(dynamicconfig.MatchingForwarderMaxChildrenPerNode, 20),
ShutdownDrainDuration: dc.GetDurationProperty(dynamicconfig.MatchingShutdownDrainDuration, 0),

AdminMatchingDispatchRate: dc.GetFloatPropertyFilteredByNamespace(dynamicconfig.AdminMatchingDispatchRate, 1000000),
AdminMatchingTaskqueueDispatchRate: dc.GetFloatPropertyFilteredByTaskQueueInfo(dynamicconfig.AdminMatchingTaskqueueDispatchRate, 4000),
}
}

Expand All @@ -130,6 +139,14 @@ func newTaskQueueConfig(id *taskQueueID, config *Config, namespaceCache cache.Na
namespace := namespaceEntry.GetInfo().Name
taskQueueName := id.name
taskType := id.taskType

writePartition := func() int {
return common.MaxInt(1, config.NumTaskqueueWritePartitions(namespace, taskQueueName, taskType))
}
readPartition := func() int {
return common.MaxInt(1, config.NumTaskqueueReadPartitions(namespace, taskQueueName, taskType))
}

return &taskQueueConfig{
RangeSize: config.RangeSize,
GetTasksBatchSize: func() int {
Expand Down Expand Up @@ -162,11 +179,13 @@ func newTaskQueueConfig(id *taskQueueID, config *Config, namespaceCache cache.Na
MaxTaskBatchSize: func() int {
return config.MaxTaskBatchSize(namespace, taskQueueName, taskType)
},
NumWritePartitions: func() int {
return common.MaxInt(1, config.NumTaskqueueWritePartitions(namespace, taskQueueName, taskType))
NumWritePartitions: writePartition,
NumReadPartitions: readPartition,
AdminDispatchRate: func() float64 {
return config.AdminMatchingDispatchRate(namespace)
},
NumReadPartitions: func() int {
return common.MaxInt(1, config.NumTaskqueueReadPartitions(namespace, taskQueueName, taskType))
AdminQueuePartitionDispatchRate: func() float64 {
return config.AdminMatchingTaskqueueDispatchRate(namespace, taskQueueName, taskType) / float64(readPartition())
},
forwarderConfig: forwarderConfig{
ForwarderMaxOutstandingPolls: func() int {
Expand Down
24 changes: 16 additions & 8 deletions service/matching/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,19 @@ const (
func newTaskMatcher(config *taskQueueConfig, fwdr *Forwarder, scopeFunc func() metrics.Scope) *TaskMatcher {
dynamicRate := quotas.NewDynamicRate(defaultTaskDispatchRPS)
dynamicBurst := quotas.NewDynamicBurst(int(defaultTaskDispatchRPS))
limiter := quotas.NewDynamicRateLimiter(
dynamicRate.RateFn(),
dynamicBurst.BurstFn(),
defaultTaskDispatchRPSTTL,
)
limiter := quotas.NewMultiStageRateLimiter([]quotas.RateLimiter{
quotas.NewDynamicRateLimiter(
dynamicRate.RateFn(),
dynamicBurst.BurstFn(),
defaultTaskDispatchRPSTTL,
),
quotas.NewDefaultOutgoingDynamicRateLimiter(
config.AdminQueuePartitionDispatchRate,
),
quotas.NewDefaultOutgoingDynamicRateLimiter(
config.AdminDispatchRate,
),
})
return &TaskMatcher{
config: config,
dynamicRate: dynamicRate,
Expand Down Expand Up @@ -304,10 +312,10 @@ func (tm *TaskMatcher) UpdateRatelimit(rps *float64) {
}

rate := *rps
nPartitions := tm.numPartitions()
if rate > float64(nPartitions) {
nPartitions := float64(tm.numPartitions())
if rate > nPartitions {
// divide the rate equally across all partitions
rate = rate / float64(tm.numPartitions())
rate = rate / nPartitions
}

burst := int(rate)
Expand Down
2 changes: 1 addition & 1 deletion service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ func (s *matchingEngineSuite) TestAddThenConsumeActivities() {

func (s *matchingEngineSuite) TestSyncMatchActivities() {
// Set a short long poll expiration so we don't have to wait too long for 0 throttling cases
s.matchingEngine.config.LongPollExpirationInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskQueueInfo(50 * time.Millisecond)
s.matchingEngine.config.LongPollExpirationInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskQueueInfo(2 * time.Second)

runID := uuid.NewRandom().String()
workflowID := "workflow1"
Expand Down
3 changes: 0 additions & 3 deletions service/matching/taskQueueManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,6 @@ func TestDescribeTaskQueue(t *testing.T) {
require.Zero(t, taskQueueStatus.GetAckLevel())
require.Equal(t, taskCount, taskQueueStatus.GetReadLevel())
require.Equal(t, taskCount, taskQueueStatus.GetBacklogCountHint())
require.True(t, taskQueueStatus.GetRatePerSecond() > (defaultTaskDispatchRPS-1))
require.True(t, taskQueueStatus.GetRatePerSecond() < (defaultTaskDispatchRPS+1))
taskIDBlock := taskQueueStatus.GetTaskIdBlock()
require.Equal(t, int64(1), taskIDBlock.GetStartId())
require.Equal(t, tlm.config.RangeSize, taskIDBlock.GetEndId())
Expand All @@ -228,7 +226,6 @@ func TestDescribeTaskQueue(t *testing.T) {
require.Equal(t, 1, len(descResp.GetPollers()))
require.Equal(t, PollerIdentity, descResp.Pollers[0].GetIdentity())
require.NotEmpty(t, descResp.Pollers[0].GetLastAccessTime())
require.True(t, descResp.Pollers[0].GetRatePerSecond() > (defaultTaskDispatchRPS-1))

rps := 5.0
tlm.pollerHistory.updatePollerInfo(pollerIdentity(PollerIdentity), &rps)
Expand Down
7 changes: 5 additions & 2 deletions service/matching/taskReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ package matching

import (
"context"
"runtime"
"time"

enumsspb "go.temporal.io/server/api/enums/v1"
Expand All @@ -38,6 +37,10 @@ import (
"go.temporal.io/server/service/worker/scanner/taskqueue"
)

const (
taskReaderOfferThrottleWait = time.Second
)

type (
taskReader struct {
taskBuffer chan *persistencespb.AllocatedTaskInfo // tasks loaded from persistence
Expand Down Expand Up @@ -110,7 +113,7 @@ dispatchLoop:
// this should never happen unless there is a bug - don't drop the task
tr.scope().IncCounter(metrics.BufferThrottlePerTaskQueueCounter)
tr.logger().Error("taskReader: unexpected error dispatching task", tag.Error(err))
runtime.Gosched()
time.Sleep(taskReaderOfferThrottleWait)
}
case <-tr.dispatcherShutdownC:
break dispatchLoop
Expand Down

0 comments on commit de1a010

Please sign in to comment.