Skip to content

Commit

Permalink
Watch throttler config per Open(); Enable() creates own context
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach committed Oct 4, 2023
1 parent 2d7b093 commit 5e3bd0d
Showing 1 changed file with 64 additions and 65 deletions.
129 changes: 64 additions & 65 deletions go/vt/vttablet/tabletserver/throttle/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,11 @@ type Throttler struct {

lastCheckTimeNano int64

initMutex sync.Mutex
enableMutex sync.Mutex
cancelOpenContext context.CancelFunc
cancelEnableContext context.CancelFunc
throttledAppsMutex sync.Mutex
watchSrvKeyspaceOnce sync.Once
initMutex sync.Mutex
enableMutex sync.Mutex
cancelOpenContext context.CancelFunc
cancelEnableContext context.CancelFunc
throttledAppsMutex sync.Mutex

nonLowPriorityAppRequestsThrottled *cache.Cache
httpClient *http.Client
Expand Down Expand Up @@ -353,9 +352,9 @@ func (throttler *Throttler) applyThrottlerConfig(ctx context.Context, throttlerC
throttler.ThrottleApp(appRule.Name, protoutil.TimeFromProto(appRule.ExpiresAt).UTC(), appRule.Ratio, appRule.Exempt)
}
if throttlerConfig.Enabled {
go throttler.Enable(ctx)
go throttler.Enable()
} else {
go throttler.Disable(ctx)
go throttler.Disable()
}
}

Expand Down Expand Up @@ -383,18 +382,18 @@ func (throttler *Throttler) IsRunning() bool {

// Enable activates the throttler probes; when enabled, the throttler responds to check queries based on
// the collected metrics.
func (throttler *Throttler) Enable(ctx context.Context) bool {
func (throttler *Throttler) Enable() bool {
throttler.enableMutex.Lock()
defer throttler.enableMutex.Unlock()

isEnabled := throttler.isEnabled.Swap(true)
if isEnabled {
if wasEnabled := throttler.isEnabled.Swap(true); wasEnabled {
log.Infof("Throttler: already enabled")
return false
}
log.Infof("Throttler: enabling")

ctx, throttler.cancelEnableContext = context.WithCancel(ctx)
var ctx context.Context
ctx, throttler.cancelEnableContext = context.WithCancel(context.Background())
throttler.check.SelfChecks(ctx)
throttler.Operate(ctx)

Expand All @@ -406,12 +405,11 @@ func (throttler *Throttler) Enable(ctx context.Context) bool {

// Disable deactivates the probes and associated operations. When disabled, the throttler reponds to check
// queries with "200 OK" irrespective of lag or any other metrics.
func (throttler *Throttler) Disable(ctx context.Context) bool {
func (throttler *Throttler) Disable() bool {
throttler.enableMutex.Lock()
defer throttler.enableMutex.Unlock()

isEnabled := throttler.isEnabled.Swap(false)
if !isEnabled {
if wasEnabled := throttler.isEnabled.Swap(false); !wasEnabled {
log.Infof("Throttler: already disabled")
return false
}
Expand All @@ -426,6 +424,54 @@ func (throttler *Throttler) Disable(ctx context.Context) bool {
return true
}

// retryReadAndApplyThrottlerConfig() is called by Open(), read throttler config from topo, applies it, and starts watching
// for topo changes.
// But also, we're in an Open() function, which blocks state manager's operation, and affects
// opening of all other components. We thus read the throttler config in the background.
// However, we want to handle a situation where the read errors out.
// So we kick a loop that keeps retrying reading the config, for as long as this throttler is open.
func (throttler *Throttler) retryReadAndApplyThrottlerConfig(ctx context.Context) {
var watchSrvKeyspaceOnce sync.Once
retryInterval := 10 * time.Second
retryTicker := time.NewTicker(retryInterval)
defer retryTicker.Stop()
for {
if !throttler.IsOpen() {
// Throttler is not open so no need to keep retrying.
log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): throttler no longer seems to be open, exiting")
return
}

requestCtx, requestCancel := context.WithTimeout(ctx, 5*time.Second)
defer requestCancel()
throttlerConfig, err := throttler.readThrottlerConfig(requestCtx)
if err == nil {
log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): success reading throttler config: %+v", throttlerConfig)
// It's possible that during a retry-sleep, the throttler is closed and opened again, leading
// to two (or more) instances of this goroutine. That's not a big problem; it's fine if all
// attempt to read the throttler config; but we just want to ensure they don't step on each other
// while applying the changes.
throttler.initMutex.Lock()
defer throttler.initMutex.Unlock()
throttler.applyThrottlerConfig(ctx, throttlerConfig) // may issue an Enable
go watchSrvKeyspaceOnce.Do(func() {
// We start watching SrvKeyspace only after we know it's been created. Now is that time!
// We watch using the given ctx, which is cancelled when the throttler is Close()d.
throttler.srvTopoServer.WatchSrvKeyspace(ctx, throttler.cell, throttler.keyspace, throttler.WatchSrvKeyspaceCallback)
})
return
}
log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): error reading throttler config. Will retry in %v. Err=%+v", retryInterval, err)
select {
case <-ctx.Done():
// Throttler is not open so no need to keep retrying.
log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): throttler no longer seems to be open, exiting")
return
case <-retryTicker.C:
}
}
}

// Open opens database pool and initializes the schema
func (throttler *Throttler) Open() error {
log.Infof("Throttler: started execution of Open. Acquiring initMutex lock")
Expand All @@ -450,52 +496,7 @@ func (throttler *Throttler) Open() error {

throttler.ThrottleApp("always-throttled-app", time.Now().Add(time.Hour*24*365*10), DefaultThrottleRatio, false)

log.Infof("Throttler: throttler-config-via-topo detected")
// We want to read throttler config from topo and apply it.
// But also, we're in an Open() function, which blocks state manager's operation, and affects
// opening of all other components. We thus read the throttler config in the background.
// However, we want to handle a situation where the read errors out.
// So we kick a loop that keeps retrying reading the config, for as long as this throttler is open.
retryReadAndApplyThrottlerConfig := func(ctx context.Context) {
retryInterval := 10 * time.Second
retryTicker := time.NewTicker(retryInterval)
defer retryTicker.Stop()
for {
if !throttler.IsOpen() {
// Throttler is not open so no need to keep retrying.
log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): throttler no longer seems to be open, exiting")
return
}

requestCtx, requestCancel := context.WithTimeout(ctx, 5*time.Second)
defer requestCancel()
throttlerConfig, err := throttler.readThrottlerConfig(requestCtx)
if err == nil {
log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): success reading throttler config: %+v", throttlerConfig)
// It's possible that during a retry-sleep, the throttler is closed and opened again, leading
// to two (or more) instances of this goroutine. That's not a big problem; it's fine if all
// attempt to read the throttler config; but we just want to ensure they don't step on each other
// while applying the changes.
throttler.initMutex.Lock()
defer throttler.initMutex.Unlock()
throttler.applyThrottlerConfig(ctx, throttlerConfig) // may issue an Enable
go throttler.watchSrvKeyspaceOnce.Do(func() {
// We start watching SrvKeyspace only after we know it's been created. Now is that time!
throttler.srvTopoServer.WatchSrvKeyspace(context.Background(), throttler.cell, throttler.keyspace, throttler.WatchSrvKeyspaceCallback)
})
return
}
log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): error reading throttler config. Will retry in %v. Err=%+v", retryInterval, err)
select {
case <-ctx.Done():
// Throttler is not open so no need to keep retrying.
log.Errorf("Throttler.retryReadAndApplyThrottlerConfig(): throttler no longer seems to be open, exiting")
return
case <-retryTicker.C:
}
}
}
go retryReadAndApplyThrottlerConfig(ctx)
go throttler.retryReadAndApplyThrottlerConfig(ctx)

return nil
}
Expand All @@ -511,8 +512,7 @@ func (throttler *Throttler) Close() {
log.Infof("Throttler: throttler is not open")
return
}
ctx := context.Background()
throttler.Disable(ctx)
throttler.Disable()
throttler.isLeader.Store(false)

log.Infof("Throttler: closing pool")
Expand Down Expand Up @@ -608,10 +608,9 @@ func (throttler *Throttler) Operate(ctx context.Context) {
throttledAppsTicker := addTicker(throttledAppsSnapshotInterval)
recentCheckTicker := addTicker(time.Second)

tmClient := tmclient.NewTabletManagerClient()

go func() {
defer log.Infof("Throttler: Operate terminated, tickers stopped")
tmClient := tmclient.NewTabletManagerClient()
defer tmClient.Close()
for _, t := range tickers {
defer t.Stop()
Expand Down

0 comments on commit 5e3bd0d

Please sign in to comment.