Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txthrottler: remove txThrottlerConfig struct, rely on tabletenv #13624

119 changes: 44 additions & 75 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,7 @@ func fetchKnownCells(ctx context.Context, topoServer *topo.Server, target *query
// be executing a method. The only exception is the 'Throttle' method where multiple goroutines are
// allowed to execute it concurrently.
type txThrottler struct {
// config stores the transaction throttler's configuration.
// It is populated in NewTxThrottler and is not modified
// since.
config *txThrottlerConfig
config *tabletenv.TabletConfig

// state holds an open transaction throttler state. It is nil
// if the TransactionThrottler is closed.
Expand All @@ -162,30 +159,6 @@ type txThrottler struct {
requestsThrottled *stats.CountersWithSingleLabel
}

// txThrottlerConfig holds the parameters that need to be
// passed when constructing a TxThrottler object.
type txThrottlerConfig struct {
// enabled is true if the transaction throttler is enabled. All methods
// of a disabled transaction throttler do nothing and Throttle() always
// returns false.
enabled bool

// if dryRun is true, the txThrottler will run only on monitoring mode, meaning that it will increase counters for
// total and actually throttled requests, but it will not actually return that a transaction should be throttled.
dryRun bool

throttlerConfig *throttlerdatapb.Configuration
// healthCheckCells stores the cell names in which running vttablets will be monitored for
// replication lag.
healthCheckCells []string

// tabletTypes stores the tablet types for throttling
tabletTypes map[topodatapb.TabletType]bool

// rate to refresh topo for cells
topoRefreshInterval time.Duration
}

type txThrottlerState interface {
deallocateResources()
StatsUpdate(tabletStats *discovery.TabletHealth)
Expand All @@ -194,7 +167,7 @@ type txThrottlerState interface {

// txThrottlerStateImpl holds the state of an open TxThrottler object.
type txThrottlerStateImpl struct {
config *txThrottlerConfig
config *tabletenv.TabletConfig
txThrottler *txThrottler

// throttleMu serializes calls to throttler.Throttler.Throttle(threadId).
Expand All @@ -208,49 +181,38 @@ type txThrottlerStateImpl struct {
healthCheckChan chan *discovery.TabletHealth
healthCheckCells []string
cellsFromTopo bool

// tabletTypes stores the tablet types for throttling
tabletTypes map[topodatapb.TabletType]bool
}

// NewTxThrottler tries to construct a txThrottler from the
// relevant fields in the tabletenv.Config object. It returns a disabled TxThrottler if
// any error occurs.
// This function calls tryCreateTxThrottler that does the actual creation work
// and returns an error if one occurred.
// NewTxThrottler tries to construct a txThrottler from the relevant
// fields in the tabletenv.Env and topo.Server objects.
func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler {
throttlerConfig := &txThrottlerConfig{enabled: false}

if env.Config().EnableTxThrottler {
// Clone tsv.TxThrottlerHealthCheckCells so that we don't assume tsv.TxThrottlerHealthCheckCells
// is immutable.
healthCheckCells := env.Config().TxThrottlerHealthCheckCells

tabletTypes := make(map[topodatapb.TabletType]bool, len(*env.Config().TxThrottlerTabletTypes))
for _, tabletType := range *env.Config().TxThrottlerTabletTypes {
tabletTypes[tabletType] = true
config := env.Config()
if config.EnableTxThrottler {
if len(config.TxThrottlerHealthCheckCells) == 0 {
defer log.Infof("Initialized transaction throttler using tabletTypes: %+v, cellsFromTopo: true, topoRefreshInterval: %s, throttlerConfig: %q",
config.TxThrottlerTabletTypes, config.TxThrottlerTopoRefreshInterval, config.TxThrottlerConfig.Get(),
)
} else {
defer log.Infof("Initialized transaction throttler using tabletTypes: %+v, healthCheckCells: %+v, throttlerConfig: %q",
config.TxThrottlerTabletTypes, config.TxThrottlerHealthCheckCells, config.TxThrottlerConfig.Get(),
)
}

throttlerConfig = &txThrottlerConfig{
enabled: true,
healthCheckCells: healthCheckCells,
dryRun: env.Config().TxThrottlerDryRun,
tabletTypes: tabletTypes,
throttlerConfig: env.Config().TxThrottlerConfig.Get(),
topoRefreshInterval: env.Config().TxThrottlerTopoRefreshInterval,
}

defer log.Infof("Initialized transaction throttler with config: %+v", throttlerConfig)
}

return &txThrottler{
config: throttlerConfig,
config: config,
topoServer: topoServer,
throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"),
topoWatchers: env.Exporter().NewGaugesWithSingleLabel("TransactionThrottlerTopoWatchers", "transaction throttler topology watchers", "cell"),
healthChecksReadTotal: env.Exporter().NewCountersWithMultiLabels("TransactionThrottlerHealthchecksRead", "transaction throttler healthchecks read",
throttlerRunning: env.Exporter().NewGauge(TxThrottlerName+"Running", "transaction throttler running state"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noting that this is not a breaking change because TxThrottlerName == "TransactionThrottler"

topoWatchers: env.Exporter().NewGaugesWithSingleLabel(TxThrottlerName+"TopoWatchers", "transaction throttler topology watchers", "cell"),
healthChecksReadTotal: env.Exporter().NewCountersWithMultiLabels(TxThrottlerName+"HealthchecksRead", "transaction throttler healthchecks read",
[]string{"cell", "DbType"}),
healthChecksRecordedTotal: env.Exporter().NewCountersWithMultiLabels("TransactionThrottlerHealthchecksRecorded", "transaction throttler healthchecks recorded",
healthChecksRecordedTotal: env.Exporter().NewCountersWithMultiLabels(TxThrottlerName+"HealthchecksRecorded", "transaction throttler healthchecks recorded",
[]string{"cell", "DbType"}),
requestsTotal: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerRequests", "transaction throttler requests", "workload"),
requestsThrottled: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerThrottled", "transaction throttler requests throttled", "workload"),
requestsTotal: env.Exporter().NewCountersWithSingleLabel(TxThrottlerName+"Requests", "transaction throttler requests", "workload"),
requestsThrottled: env.Exporter().NewCountersWithSingleLabel(TxThrottlerName+"Throttled", "transaction throttler requests throttled", "workload"),
}
}

Expand All @@ -261,7 +223,7 @@ func (t *txThrottler) InitDBConfig(target *querypb.Target) {

// Open opens the transaction throttler. It must be called prior to 'Throttle'.
func (t *txThrottler) Open() (err error) {
if !t.config.enabled {
if !t.config.EnableTxThrottler {
return nil
}
if t.state != nil {
Expand All @@ -277,7 +239,7 @@ func (t *txThrottler) Open() (err error) {
// It should be called after the throttler is no longer needed.
// It's ok to call this method on a closed throttler--in which case the method does nothing.
func (t *txThrottler) Close() {
if !t.config.enabled {
if !t.config.EnableTxThrottler {
return
}
if t.state == nil {
Expand All @@ -294,7 +256,7 @@ func (t *txThrottler) Close() {
// should back off). Throttle requires that Open() was previously called
// successfully.
func (t *txThrottler) Throttle(priority int, workload string) (result bool) {
if !t.config.enabled {
if !t.config.EnableTxThrottler {
return false
}
if t.state == nil {
Expand All @@ -310,11 +272,11 @@ func (t *txThrottler) Throttle(priority int, workload string) (result bool) {
t.requestsThrottled.Add(workload, 1)
}

return result && !t.config.dryRun
return result && !t.config.TxThrottlerDryRun
}

func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, target *querypb.Target) (txThrottlerState, error) {
maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}
func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfig, target *querypb.Target) (txThrottlerState, error) {
maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.TxThrottlerConfig.Get()}

t, err := throttlerFactory(
TxThrottlerName,
Expand All @@ -326,13 +288,20 @@ func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, ta
if err != nil {
return nil, err
}
if err := t.UpdateConfiguration(config.throttlerConfig, true /* copyZeroValues */); err != nil {
if err := t.UpdateConfiguration(config.TxThrottlerConfig.Get(), true /* copyZeroValues */); err != nil {
t.Close()
return nil, err
}

tabletTypes := make(map[topodatapb.TabletType]bool, len(*config.TxThrottlerTabletTypes))
for _, tabletType := range *config.TxThrottlerTabletTypes {
tabletTypes[tabletType] = true
}

state := &txThrottlerStateImpl{
config: config,
healthCheckCells: config.healthCheckCells,
healthCheckCells: config.TxThrottlerHealthCheckCells,
tabletTypes: tabletTypes,
throttler: t,
txThrottler: txThrottler,
}
Expand Down Expand Up @@ -402,7 +371,7 @@ func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context, topo
func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoServer *topo.Server, target *querypb.Target) {
var cellsUpdateTicks <-chan time.Time
if ts.cellsFromTopo {
ticker := time.NewTicker(ts.config.topoRefreshInterval)
ticker := time.NewTicker(ts.config.TxThrottlerTopoRefreshInterval)
cellsUpdateTicks = ticker.C
defer ticker.Stop()
}
Expand All @@ -420,7 +389,7 @@ func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoS

func (ts *txThrottlerStateImpl) throttle() bool {
if ts.throttler == nil {
log.Error("throttle called after deallocateResources was called")
log.Error("txThrottler: throttle called after deallocateResources was called")
return false
}
// Serialize calls to ts.throttle.Throttle()
Expand All @@ -442,7 +411,7 @@ func (ts *txThrottlerStateImpl) deallocateResources() {

// StatsUpdate updates the health of a tablet with the given healthcheck.
func (ts *txThrottlerStateImpl) StatsUpdate(tabletStats *discovery.TabletHealth) {
if ts.config.tabletTypes == nil {
if len(ts.tabletTypes) == 0 {
return
}

Expand All @@ -451,8 +420,8 @@ func (ts *txThrottlerStateImpl) StatsUpdate(tabletStats *discovery.TabletHealth)
ts.txThrottler.healthChecksReadTotal.Add(metricLabels, 1)

// Monitor tablets for replication lag if they have a tablet
// type specified by the --tx_throttler_tablet_types flag.
if ts.config.tabletTypes[tabletType] {
// type specified by the --tx-throttler-tablet-types flag.
if ts.tabletTypes[tabletType] {
ts.throttler.RecordReplicationLag(time.Now(), tabletStats)
ts.txThrottler.healthChecksRecordedTotal.Add(metricLabels, 1)
}
Expand Down
35 changes: 5 additions & 30 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ func TestEnabledThrottler(t *testing.T) {
})

assert.Nil(t, throttlerImpl.Open())
throttlerStateImpl := throttlerImpl.state.(*txThrottlerStateImpl)
assert.Equal(t, map[topodatapb.TabletType]bool{topodatapb.TabletType_REPLICA: true}, throttlerStateImpl.tabletTypes)
assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get())
assert.Equal(t, map[string]int64{"cell1": 1, "cell2": 1}, throttlerImpl.topoWatchers.Counts())

Expand Down Expand Up @@ -174,33 +176,6 @@ func TestFetchKnownCells(t *testing.T) {
}
}

func TestNewTxThrottler(t *testing.T) {
config := tabletenv.NewDefaultConfig()
env := tabletenv.NewEnv(config, t.Name())

{
// disabled
config.EnableTxThrottler = false
throttler := NewTxThrottler(env, nil)
throttlerImpl, _ := throttler.(*txThrottler)
assert.NotNil(t, throttlerImpl)
assert.NotNil(t, throttlerImpl.config)
assert.False(t, throttlerImpl.config.enabled)
}
{
// enabled
config.EnableTxThrottler = true
config.TxThrottlerHealthCheckCells = []string{"cell1", "cell2"}
config.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA}
throttler := NewTxThrottler(env, nil)
throttlerImpl, _ := throttler.(*txThrottler)
assert.NotNil(t, throttlerImpl)
assert.NotNil(t, throttlerImpl.config)
assert.True(t, throttlerImpl.config.enabled)
assert.Equal(t, []string{"cell1", "cell2"}, throttlerImpl.config.healthCheckCells)
}
}

func TestDryRunThrottler(t *testing.T) {
config := tabletenv.NewDefaultConfig()
env := tabletenv.NewEnv(config, t.Name())
Expand All @@ -222,9 +197,9 @@ func TestDryRunThrottler(t *testing.T) {

t.Run(theTestCase.Name, func(t *testing.T) {
aTxThrottler := &txThrottler{
config: &txThrottlerConfig{
enabled: true,
dryRun: theTestCase.throttlerDryRun,
config: &tabletenv.TabletConfig{
EnableTxThrottler: true,
TxThrottlerDryRun: theTestCase.throttlerDryRun,
},
state: &mockTxThrottlerState{shouldThrottle: theTestCase.txThrottlerStateShouldThrottle},
throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"),
Expand Down
Loading