diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 56aed7a459b..cd3b9194de7 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -163,12 +163,11 @@ type Throttler struct { lastCheckTimeNano int64 - initMutex sync.Mutex - enableMutex sync.Mutex - cancelOpenContext context.CancelFunc - cancelEnableContext context.CancelFunc - throttledAppsMutex sync.Mutex - watchSrvKeyspaceOnce sync.Once + initMutex sync.Mutex + enableMutex sync.Mutex + cancelOpenContext context.CancelFunc + cancelEnableContext context.CancelFunc + throttledAppsMutex sync.Mutex nonLowPriorityAppRequestsThrottled *cache.Cache httpClient *http.Client @@ -353,9 +352,9 @@ func (throttler *Throttler) applyThrottlerConfig(ctx context.Context, throttlerC throttler.ThrottleApp(appRule.Name, protoutil.TimeFromProto(appRule.ExpiresAt).UTC(), appRule.Ratio, appRule.Exempt) } if throttlerConfig.Enabled { - go throttler.Enable(ctx) + go throttler.Enable() } else { - go throttler.Disable(ctx) + go throttler.Disable() } } @@ -383,18 +382,18 @@ func (throttler *Throttler) IsRunning() bool { // Enable activates the throttler probes; when enabled, the throttler responds to check queries based on // the collected metrics. -func (throttler *Throttler) Enable(ctx context.Context) bool { +func (throttler *Throttler) Enable() bool { throttler.enableMutex.Lock() defer throttler.enableMutex.Unlock() - isEnabled := throttler.isEnabled.Swap(true) - if isEnabled { + if wasEnabled := throttler.isEnabled.Swap(true); wasEnabled { log.Infof("Throttler: already enabled") return false } log.Infof("Throttler: enabling") - ctx, throttler.cancelEnableContext = context.WithCancel(ctx) + var ctx context.Context + ctx, throttler.cancelEnableContext = context.WithCancel(context.Background()) throttler.check.SelfChecks(ctx) throttler.Operate(ctx) @@ -406,12 +405,11 @@ func (throttler *Throttler) Enable(ctx context.Context) bool { // Disable deactivates the probes and associated operations. When disabled, the throttler reponds to check // queries with "200 OK" irrespective of lag or any other metrics. -func (throttler *Throttler) Disable(ctx context.Context) bool { +func (throttler *Throttler) Disable() bool { throttler.enableMutex.Lock() defer throttler.enableMutex.Unlock() - isEnabled := throttler.isEnabled.Swap(false) - if !isEnabled { + if wasEnabled := throttler.isEnabled.Swap(false); !wasEnabled { log.Infof("Throttler: already disabled") return false } @@ -426,6 +424,54 @@ func (throttler *Throttler) Disable(ctx context.Context) bool { return true } +// retryReadAndApplyThrottlerConfig() is called by Open(), read throttler config from topo, applies it, and starts watching +// for topo changes. +// But also, we're in an Open() function, which blocks state manager's operation, and affects +// opening of all other components. We thus read the throttler config in the background. +// However, we want to handle a situation where the read errors out. +// So we kick a loop that keeps retrying reading the config, for as long as this throttler is open. +func (throttler *Throttler) retryReadAndApplyThrottlerConfig(ctx context.Context) { + var watchSrvKeyspaceOnce sync.Once + retryInterval := 10 * time.Second + retryTicker := time.NewTicker(retryInterval) + defer retryTicker.Stop() + for { + if !throttler.IsOpen() { + // Throttler is not open so no need to keep retrying. + log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): throttler no longer seems to be open, exiting") + return + } + + requestCtx, requestCancel := context.WithTimeout(ctx, 5*time.Second) + defer requestCancel() + throttlerConfig, err := throttler.readThrottlerConfig(requestCtx) + if err == nil { + log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): success reading throttler config: %+v", throttlerConfig) + // It's possible that during a retry-sleep, the throttler is closed and opened again, leading + // to two (or more) instances of this goroutine. That's not a big problem; it's fine if all + // attempt to read the throttler config; but we just want to ensure they don't step on each other + // while applying the changes. + throttler.initMutex.Lock() + defer throttler.initMutex.Unlock() + throttler.applyThrottlerConfig(ctx, throttlerConfig) // may issue an Enable + go watchSrvKeyspaceOnce.Do(func() { + // We start watching SrvKeyspace only after we know it's been created. Now is that time! + // We watch using the given ctx, which is cancelled when the throttler is Close()d. + throttler.srvTopoServer.WatchSrvKeyspace(ctx, throttler.cell, throttler.keyspace, throttler.WatchSrvKeyspaceCallback) + }) + return + } + log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): error reading throttler config. Will retry in %v. Err=%+v", retryInterval, err) + select { + case <-ctx.Done(): + // Throttler is not open so no need to keep retrying. + log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): throttler no longer seems to be open, exiting") + return + case <-retryTicker.C: + } + } +} + // Open opens database pool and initializes the schema func (throttler *Throttler) Open() error { log.Infof("Throttler: started execution of Open. Acquiring initMutex lock") @@ -450,52 +496,7 @@ func (throttler *Throttler) Open() error { throttler.ThrottleApp("always-throttled-app", time.Now().Add(time.Hour*24*365*10), DefaultThrottleRatio, false) - log.Infof("Throttler: throttler-config-via-topo detected") - // We want to read throttler config from topo and apply it. - // But also, we're in an Open() function, which blocks state manager's operation, and affects - // opening of all other components. We thus read the throttler config in the background. - // However, we want to handle a situation where the read errors out. - // So we kick a loop that keeps retrying reading the config, for as long as this throttler is open. - retryReadAndApplyThrottlerConfig := func(ctx context.Context) { - retryInterval := 10 * time.Second - retryTicker := time.NewTicker(retryInterval) - defer retryTicker.Stop() - for { - if !throttler.IsOpen() { - // Throttler is not open so no need to keep retrying. - log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): throttler no longer seems to be open, exiting") - return - } - - requestCtx, requestCancel := context.WithTimeout(ctx, 5*time.Second) - defer requestCancel() - throttlerConfig, err := throttler.readThrottlerConfig(requestCtx) - if err == nil { - log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): success reading throttler config: %+v", throttlerConfig) - // It's possible that during a retry-sleep, the throttler is closed and opened again, leading - // to two (or more) instances of this goroutine. That's not a big problem; it's fine if all - // attempt to read the throttler config; but we just want to ensure they don't step on each other - // while applying the changes. - throttler.initMutex.Lock() - defer throttler.initMutex.Unlock() - throttler.applyThrottlerConfig(ctx, throttlerConfig) // may issue an Enable - go throttler.watchSrvKeyspaceOnce.Do(func() { - // We start watching SrvKeyspace only after we know it's been created. Now is that time! - throttler.srvTopoServer.WatchSrvKeyspace(context.Background(), throttler.cell, throttler.keyspace, throttler.WatchSrvKeyspaceCallback) - }) - return - } - log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): error reading throttler config. Will retry in %v. Err=%+v", retryInterval, err) - select { - case <-ctx.Done(): - // Throttler is not open so no need to keep retrying. - log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): throttler no longer seems to be open, exiting") - return - case <-retryTicker.C: - } - } - } - go retryReadAndApplyThrottlerConfig(ctx) + go throttler.retryReadAndApplyThrottlerConfig(ctx) return nil } @@ -511,8 +512,7 @@ func (throttler *Throttler) Close() { log.Infof("Throttler: throttler is not open") return } - ctx := context.Background() - throttler.Disable(ctx) + throttler.Disable() throttler.isLeader.Store(false) log.Infof("Throttler: closing pool") @@ -608,10 +608,9 @@ func (throttler *Throttler) Operate(ctx context.Context) { throttledAppsTicker := addTicker(throttledAppsSnapshotInterval) recentCheckTicker := addTicker(time.Second) - tmClient := tmclient.NewTabletManagerClient() - go func() { defer log.Infof("Throttler: Operate terminated, tickers stopped") + tmClient := tmclient.NewTabletManagerClient() defer tmClient.Close() for _, t := range tickers { defer t.Stop()