diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 3fae6486d94..a0c47c3d3e9 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -346,6 +346,7 @@ Usage of vttablet: --tx-throttler-default-priority int Default priority assigned to queries that lack priority information (default 100) --tx-throttler-healthcheck-cells strings Synonym to -tx_throttler_healthcheck_cells --tx-throttler-tablet-types strings A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly. (default replica) + --tx-throttler-topo-refresh-interval duration The rate that the transaction throttler will refresh the topology to find cells. (default 5m0s) --tx_throttler_config string The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message. (default "target_replication_lag_sec:2 max_replication_lag_sec:10 initial_rate:100 max_increase:1 emergency_decrease:0.5 min_duration_between_increases_sec:40 max_duration_between_increases_sec:62 min_duration_between_decreases_sec:20 spread_backlog_across_sec:20 age_bad_rate_after_sec:180 bad_rate_increase:0.1 max_rate_approach_threshold:0.9") --tx_throttler_healthcheck_cells strings A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler. --unhealthy_threshold duration replication lag after which a replica is considered unhealthy (default 2h0m0s) diff --git a/go/vt/vttablet/tabletserver/tabletenv/config.go b/go/vt/vttablet/tabletserver/tabletenv/config.go index ee42fd513bc..6584bacd353 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config.go @@ -167,6 +167,7 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) { flagutil.DualFormatStringListVar(fs, ¤tConfig.TxThrottlerHealthCheckCells, "tx_throttler_healthcheck_cells", defaultConfig.TxThrottlerHealthCheckCells, "A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler.") fs.IntVar(¤tConfig.TxThrottlerDefaultPriority, "tx-throttler-default-priority", defaultConfig.TxThrottlerDefaultPriority, "Default priority assigned to queries that lack priority information") fs.Var(currentConfig.TxThrottlerTabletTypes, "tx-throttler-tablet-types", "A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly.") + fs.DurationVar(¤tConfig.TxThrottlerTopoRefreshInterval, "tx-throttler-topo-refresh-interval", time.Minute*5, "The rate that the transaction throttler will refresh the topology to find cells.") fs.BoolVar(&enableHotRowProtection, "enable_hot_row_protection", false, "If true, incoming transactions for the same row (range) will be queued and cannot consume all txpool slots.") fs.BoolVar(&enableHotRowProtectionDryRun, "enable_hot_row_protection_dry_run", false, "If true, hot row protection is not enforced but logs if transactions would have been queued.") @@ -335,11 +336,12 @@ type TabletConfig struct { TwoPCCoordinatorAddress string `json:"-"` TwoPCAbandonAge Seconds `json:"-"` - EnableTxThrottler bool `json:"-"` - TxThrottlerConfig *TxThrottlerConfigFlag `json:"-"` - TxThrottlerHealthCheckCells []string `json:"-"` - TxThrottlerDefaultPriority int `json:"-"` - TxThrottlerTabletTypes *topoproto.TabletTypeListFlag `json:"-"` + EnableTxThrottler bool `json:"-"` + TxThrottlerConfig *TxThrottlerConfigFlag `json:"-"` + TxThrottlerHealthCheckCells []string `json:"-"` + TxThrottlerDefaultPriority int `json:"-"` + TxThrottlerTabletTypes *topoproto.TabletTypeListFlag `json:"-"` + TxThrottlerTopoRefreshInterval time.Duration `json:"-"` EnableLagThrottler bool `json:"-"` @@ -534,9 +536,6 @@ func (c *TabletConfig) verifyTxThrottlerConfig() error { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "failed to parse throttlerdatapb.Configuration config: %v", err) } - if len(c.TxThrottlerHealthCheckCells) == 0 { - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "empty healthCheckCells given: %+v", c.TxThrottlerHealthCheckCells) - } if v := c.TxThrottlerDefaultPriority; v > sqlparser.MaxPriorityValue || v < 0 { return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--tx-throttler-default-priority must be > 0 and < 100 (specified value: %d)", v) } @@ -619,11 +618,12 @@ var defaultConfig = TabletConfig{ DeprecatedCacheResultFields: true, SignalWhenSchemaChange: true, - EnableTxThrottler: false, - TxThrottlerConfig: defaultTxThrottlerConfig(), - TxThrottlerHealthCheckCells: []string{}, - TxThrottlerDefaultPriority: sqlparser.MaxPriorityValue, // This leads to all queries being candidates to throttle - TxThrottlerTabletTypes: &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA}, + EnableTxThrottler: false, + TxThrottlerConfig: defaultTxThrottlerConfig(), + TxThrottlerHealthCheckCells: []string{}, + TxThrottlerDefaultPriority: sqlparser.MaxPriorityValue, // This leads to all queries being candidates to throttle + TxThrottlerTabletTypes: &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA}, + TxThrottlerTopoRefreshInterval: time.Minute * 5, EnableLagThrottler: false, // Feature flag; to switch to 'true' at some stage in the future diff --git a/go/vt/vttablet/tabletserver/tabletenv/config_test.go b/go/vt/vttablet/tabletserver/tabletenv/config_test.go index 79c9091d077..1eae5218d2a 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config_test.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config_test.go @@ -376,13 +376,6 @@ func TestVerifyTxThrottlerConfig(t *testing.T) { EnableTxThrottler: true, TxThrottlerConfig: &TxThrottlerConfigFlag{invalidMaxReplicationLagModuleConfig}, }, - { - // enabled without cells defined - Name: "enabled without cells", - ExpectedErrorCode: vtrpcpb.Code_FAILED_PRECONDITION, - EnableTxThrottler: true, - TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig}, - }, { // enabled with good config (default/replica tablet type) Name: "enabled", diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index e25e4c0da89..961430ad560 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -19,6 +19,7 @@ package txthrottler import ( "context" "math/rand" + "reflect" "strings" "sync" "time" @@ -101,6 +102,17 @@ type TopologyWatcherInterface interface { // go/vt/throttler.GlobalManager. const TxThrottlerName = "TransactionThrottler" +// fetchKnownCells gathers a list of known cells from the topology. On error, +// the cell of the local tablet will be used and an error is logged. +func fetchKnownCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) []string { + cells, err := topoServer.GetKnownCells(ctx) + if err != nil { + log.Errorf("txThrottler: falling back to local cell due to error fetching cells from topology: %+v", err) + cells = []string{target.Cell} + } + return cells +} + // txThrottler implements TxThrottle for throttling transactions based on replication lag. // It's a thin wrapper around the throttler found in vitess/go/vt/throttler. // It uses a discovery.HealthCheck to send replication-lag updates to the wrapped throttler. @@ -165,6 +177,9 @@ type txThrottlerConfig struct { // tabletTypes stores the tablet types for throttling tabletTypes map[topodatapb.TabletType]bool + + // rate to refresh topo for cells + topoRefreshInterval time.Duration } // txThrottlerState holds the state of an open TxThrottler object. @@ -174,12 +189,15 @@ type txThrottlerState struct { // throttleMu serializes calls to throttler.Throttler.Throttle(threadId). // That method is required to be called in serial for each threadId. - throttleMu sync.Mutex - throttler ThrottlerInterface - stopHealthCheck context.CancelFunc + throttleMu sync.Mutex + throttler ThrottlerInterface + stopHealthCheck context.CancelFunc + topologyWatchers map[string]TopologyWatcherInterface healthCheck discovery.HealthCheck - topologyWatchers map[string]TopologyWatcherInterface + healthCheckChan chan *discovery.TabletHealth + healthCheckCells []string + cellsFromTopo bool } // NewTxThrottler tries to construct a txThrottler from the @@ -201,10 +219,11 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler { } throttlerConfig = &txThrottlerConfig{ - enabled: true, - tabletTypes: tabletTypes, - throttlerConfig: env.Config().TxThrottlerConfig.Get(), - healthCheckCells: healthCheckCells, + enabled: true, + healthCheckCells: healthCheckCells, + tabletTypes: tabletTypes, + throttlerConfig: env.Config().TxThrottlerConfig.Get(), + topoRefreshInterval: env.Config().TxThrottlerTopoRefreshInterval, } defer log.Infof("Initialized transaction throttler with config: %+v", throttlerConfig) @@ -301,44 +320,91 @@ func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, ta return nil, err } state := &txThrottlerState{ - config: config, - throttler: t, - txThrottler: txThrottler, + config: config, + healthCheckCells: config.healthCheckCells, + throttler: t, + txThrottler: txThrottler, } - createTxThrottlerHealthCheck(txThrottler.topoServer, config, state, target.Cell) - - state.topologyWatchers = make( - map[string]TopologyWatcherInterface, len(config.healthCheckCells)) - for _, cell := range config.healthCheckCells { - state.topologyWatchers[cell] = topologyWatcherFactory( - txThrottler.topoServer, - state.healthCheck, + + // get cells from topo if none defined in tabletenv config + if len(state.healthCheckCells) == 0 { + ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) + defer cancel() + state.healthCheckCells = fetchKnownCells(ctx, txThrottler.topoServer, target) + state.cellsFromTopo = true + } + + ctx, cancel := context.WithCancel(context.Background()) + state.stopHealthCheck = cancel + state.initHealthCheckStream(txThrottler.topoServer, target) + go state.healthChecksProcessor(ctx, txThrottler.topoServer, target) + + return state, nil +} + +func (ts *txThrottlerState) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) { + ts.healthCheck = healthCheckFactory(topoServer, target.Cell, ts.healthCheckCells) + ts.healthCheckChan = ts.healthCheck.Subscribe() + + ts.topologyWatchers = make( + map[string]TopologyWatcherInterface, len(ts.healthCheckCells)) + for _, cell := range ts.healthCheckCells { + ts.topologyWatchers[cell] = topologyWatcherFactory( + topoServer, + ts.healthCheck, cell, target.Keyspace, target.Shard, discovery.DefaultTopologyWatcherRefreshInterval, discovery.DefaultTopoReadConcurrency, ) - txThrottler.topoWatchers.Add(cell, 1) + ts.txThrottler.topoWatchers.Add(cell, 1) } - return state, nil } -func createTxThrottlerHealthCheck(topoServer *topo.Server, config *txThrottlerConfig, result *txThrottlerState, cell string) { - ctx, cancel := context.WithCancel(context.Background()) - result.stopHealthCheck = cancel - result.healthCheck = healthCheckFactory(topoServer, cell, config.healthCheckCells) - ch := result.healthCheck.Subscribe() - go func(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case th := <-ch: - result.StatsUpdate(th) - } +func (ts *txThrottlerState) closeHealthCheckStream() { + if ts.healthCheck == nil { + return + } + for cell, watcher := range ts.topologyWatchers { + watcher.Stop() + ts.txThrottler.topoWatchers.Reset(cell) + } + ts.topologyWatchers = nil + ts.stopHealthCheck() + ts.healthCheck.Close() +} + +func (ts *txThrottlerState) updateHealthCheckCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { + fetchCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) + defer cancel() + + knownCells := fetchKnownCells(fetchCtx, topoServer, target) + if !reflect.DeepEqual(knownCells, ts.healthCheckCells) { + log.Info("txThrottler: restarting healthcheck stream due to topology cells update") + ts.healthCheckCells = knownCells + ts.closeHealthCheckStream() + ts.initHealthCheckStream(topoServer, target) + } +} + +func (ts *txThrottlerState) healthChecksProcessor(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { + var cellsUpdateTicks <-chan time.Time + if ts.cellsFromTopo { + ticker := time.NewTicker(ts.config.topoRefreshInterval) + cellsUpdateTicks = ticker.C + defer ticker.Stop() + } + for { + select { + case <-ctx.Done(): + return + case <-cellsUpdateTicks: + ts.updateHealthCheckCells(ctx, topoServer, target) + case th := <-ts.healthCheckChan: + ts.StatsUpdate(th) } - }(ctx) + } } func (ts *txThrottlerState) throttle() bool { @@ -353,16 +419,8 @@ func (ts *txThrottlerState) throttle() bool { } func (ts *txThrottlerState) deallocateResources() { - // We don't really need to nil out the fields here - // as deallocateResources is not expected to be called - // more than once, but it doesn't hurt to do so. - for cell, watcher := range ts.topologyWatchers { - watcher.Stop() - ts.txThrottler.topoWatchers.Reset(cell) - } - ts.topologyWatchers = nil - - ts.healthCheck.Close() + // Close healthcheck and topo watchers + ts.closeHealthCheckStream() ts.healthCheck = nil // After ts.healthCheck is closed txThrottlerState.StatsUpdate() is guaranteed not diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 9c9c725e1fd..5d7089cc6ed 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -22,6 +22,7 @@ package txthrottler //go:generate mockgen -destination mock_topology_watcher_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler TopologyWatcherInterface import ( + "context" "testing" "time" @@ -112,7 +113,6 @@ func TestEnabledThrottler(t *testing.T) { config := tabletenv.NewDefaultConfig() config.EnableTxThrottler = true - config.TxThrottlerHealthCheckCells = []string{"cell1", "cell2"} config.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA} env := tabletenv.NewEnv(config, t.Name()) @@ -161,6 +161,19 @@ func TestEnabledThrottler(t *testing.T) { assert.Equal(t, map[string]int64{"cell1": 0, "cell2": 0}, throttlerImpl.topoWatchers.Counts()) } +func TestFetchKnownCells(t *testing.T) { + { + ts := memorytopo.NewServer("cell1", "cell2") + cells := fetchKnownCells(context.Background(), ts, &querypb.Target{Cell: "cell1"}) + assert.Equal(t, []string{"cell1", "cell2"}, cells) + } + { + ts := memorytopo.NewServer() + cells := fetchKnownCells(context.Background(), ts, &querypb.Target{Cell: "cell1"}) + assert.Equal(t, []string{"cell1"}, cells) + } +} + func TestNewTxThrottler(t *testing.T) { config := tabletenv.NewDefaultConfig() env := tabletenv.NewEnv(config, t.Name())