Skip to content

Commit

Permalink
Fix transaction throttler ignoring the initial rate (#12618)
Browse files Browse the repository at this point in the history
* Fix transaction throttler ignoring the initial rate

This addresses the issue reported in #12549

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Add missing override of max replication lag in `throttler.newThrottler()`

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Reorder functions to make diff easier to read

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Fix check for maxRate in `newThrottlerFromConfig()`

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Fix some CI pipeline issues

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Address PR comment.

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Fix typo

Signed-off-by: Eduardo J. Ortega U <[email protected]>

---------

Signed-off-by: Eduardo J. Ortega U <[email protected]>
Signed-off-by: Eduardo J. Ortega U. <[email protected]>
  • Loading branch information
ejortegau authored Mar 29, 2023
1 parent 197ae82 commit 39f83b9
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 12 deletions.
26 changes: 19 additions & 7 deletions go/vt/throttler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,31 @@ func NewThrottler(name, unit string, threadCount int, maxRate, maxReplicationLag
return newThrottler(GlobalManager, name, unit, threadCount, maxRate, maxReplicationLag, time.Now)
}

func NewThrottlerFromConfig(name, unit string, threadCount int, maxRateModuleMaxRate int64, maxReplicationLagModuleConfig MaxReplicationLagModuleConfig, nowFunc func() time.Time) (*Throttler, error) {
return newThrottlerFromConfig(GlobalManager, name, unit, threadCount, maxRateModuleMaxRate, maxReplicationLagModuleConfig, nowFunc)
}

func newThrottler(manager *managerImpl, name, unit string, threadCount int, maxRate, maxReplicationLag int64, nowFunc func() time.Time) (*Throttler, error) {
// Verify input parameters.
if maxRate < 0 {
return nil, fmt.Errorf("maxRate must be >= 0: %v", maxRate)
config := NewMaxReplicationLagModuleConfig(maxReplicationLag)
config.MaxReplicationLagSec = maxReplicationLag

return newThrottlerFromConfig(manager, name, unit, threadCount, maxRate, config, nowFunc)

}

func newThrottlerFromConfig(manager *managerImpl, name, unit string, threadCount int, maxRateModuleMaxRate int64, maxReplicationLagModuleConfig MaxReplicationLagModuleConfig, nowFunc func() time.Time) (*Throttler, error) {
err := maxReplicationLagModuleConfig.Verify()
if err != nil {
return nil, fmt.Errorf("invalid max replication lag config: %w", err)
}
if maxReplicationLag < 0 {
return nil, fmt.Errorf("maxReplicationLag must be >= 0: %v", maxReplicationLag)
if maxRateModuleMaxRate < 0 {
return nil, fmt.Errorf("maxRate must be >= 0: %v", maxRateModuleMaxRate)
}

// Enable the configured modules.
maxRateModule := NewMaxRateModule(maxRate)
maxRateModule := NewMaxRateModule(maxRateModuleMaxRate)
actualRateHistory := newAggregatedIntervalHistory(1024, 1*time.Second, threadCount)
maxReplicationLagModule, err := NewMaxReplicationLagModule(NewMaxReplicationLagModuleConfig(maxReplicationLag), actualRateHistory, nowFunc)
maxReplicationLagModule, err := NewMaxReplicationLagModule(maxReplicationLagModuleConfig, actualRateHistory, nowFunc)
if err != nil {
return nil, err
}
Expand Down
11 changes: 7 additions & 4 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ type txThrottlerState struct {
// in tests to generate mocks.
type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck
type topologyWatcherFactoryFunc func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface
type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (ThrottlerInterface, error)
type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error)

var (
healthCheckFactory healthCheckFactoryFunc
Expand All @@ -210,8 +210,8 @@ func resetTxThrottlerFactories() {
topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface {
return discovery.NewCellTabletsWatcher(context.Background(), topoServer, hc, discovery.NewFilterByKeyspace([]string{keyspace}), cell, refreshInterval, true, topoReadConcurrency)
}
throttlerFactory = func(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (ThrottlerInterface, error) {
return throttler.NewThrottler(name, unit, threadCount, maxRate, maxReplicationLag)
throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) {
return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now)
}
}

Expand Down Expand Up @@ -285,12 +285,15 @@ func (t *TxThrottler) Throttle() (result bool) {
}

func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string) (*txThrottlerState, error) {
maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}

t, err := throttlerFactory(
TxThrottlerName,
"TPS", /* unit */
1, /* threadCount */
throttler.MaxRateModuleDisabled, /* maxRate */
config.throttlerConfig.MaxReplicationLagSec /* maxReplicationLag */)
maxReplicationLagModuleConfig,
)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/assert"

"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/throttler"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
Expand Down Expand Up @@ -79,7 +80,7 @@ func TestEnabledThrottler(t *testing.T) {
}

mockThrottler := NewMockThrottlerInterface(mockCtrl)
throttlerFactory = func(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (ThrottlerInterface, error) {
throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) {
assert.Equal(t, 1, threadCount)
return mockThrottler, nil
}
Expand Down

0 comments on commit 39f83b9

Please sign in to comment.