Skip to content

Commit

Permalink
config: make tidb_enable_stats_owner controlling the stats owner (#55592
Browse files Browse the repository at this point in the history
)

close #55989
  • Loading branch information
hawkingrei authored Sep 27, 2024
1 parent c7fde05 commit c56694c
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 24 deletions.
8 changes: 5 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,9 +551,10 @@ type Instance struct {
PluginDir string `toml:"plugin_dir" json:"plugin_dir"`
PluginLoad string `toml:"plugin_load" json:"plugin_load"`
// MaxConnections is the maximum permitted number of simultaneous client connections.
MaxConnections uint32 `toml:"max_connections" json:"max_connections"`
TiDBEnableDDL AtomicBool `toml:"tidb_enable_ddl" json:"tidb_enable_ddl"`
TiDBRCReadCheckTS bool `toml:"tidb_rc_read_check_ts" json:"tidb_rc_read_check_ts"`
MaxConnections uint32 `toml:"max_connections" json:"max_connections"`
TiDBEnableDDL AtomicBool `toml:"tidb_enable_ddl" json:"tidb_enable_ddl"`
TiDBEnableStatsOwner AtomicBool `toml:"tidb_enable_stats_owner" json:"tidb_enable_stats_owner"`
TiDBRCReadCheckTS bool `toml:"tidb_rc_read_check_ts" json:"tidb_rc_read_check_ts"`
// TiDBServiceScope indicates the role for tidb for distributed task framework.
TiDBServiceScope string `toml:"tidb_service_scope" json:"tidb_service_scope"`
}
Expand Down Expand Up @@ -964,6 +965,7 @@ var defaultConf = Config{
PluginLoad: "",
MaxConnections: 0,
TiDBEnableDDL: *NewAtomicBool(true),
TiDBEnableStatsOwner: *NewAtomicBool(true),
TiDBRCReadCheckTS: false,
TiDBServiceScope: "",
},
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,7 @@ func TestConflictInstanceConfig(t *testing.T) {
_, err = f.WriteString("check-mb4-value-in-utf8 = true \nrun-ddl = true \n" +
"[log] \nenable-slow-log = true \n" +
"[performance] \nforce-priority = \"NO_PRIORITY\"\n" +
"[instance] \ntidb_check_mb4_value_in_utf8 = false \ntidb_enable_slow_log = false \ntidb_force_priority = \"LOW_PRIORITY\"\ntidb_enable_ddl = false")
"[instance] \ntidb_check_mb4_value_in_utf8 = false \ntidb_enable_slow_log = false \ntidb_force_priority = \"LOW_PRIORITY\"\ntidb_enable_ddl = false\ntidb_enable_stats_owner = false")
require.NoError(t, err)
require.NoError(t, f.Sync())
err = conf.Load(configFile)
Expand All @@ -1080,6 +1080,7 @@ func TestConflictInstanceConfig(t *testing.T) {
require.Equal(t, "LOW_PRIORITY", conf.Instance.ForcePriority)
require.Equal(t, true, conf.RunDDL)
require.Equal(t, false, conf.Instance.TiDBEnableDDL.Load())
require.Equal(t, false, conf.Instance.TiDBEnableStatsOwner.Load())
require.Equal(t, 0, len(DeprecatedOptions))
for _, conflictOption := range ConflictOptions {
expectedConflictOption, ok := expectedConflictOptions[conflictOption.SectionName]
Expand Down
60 changes: 42 additions & 18 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ type Domain struct {

instancePlanCache sessionctx.InstancePlanCache // the instance level plan cache

statsOwner owner.Manager
// deferFn is used to release infoschema object lazily during v1 and v2 switch
deferFn
}
Expand Down Expand Up @@ -2313,20 +2314,22 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
if do.statsLease >= 0 {
do.wg.Run(do.loadStatsWorker, "loadStatsWorker")
}
owner := do.newOwnerManager(handle.StatsPrompt, handle.StatsOwnerKey)
variable.EnableStatsOwner = do.enableStatsOwner
variable.DisableStatsOwner = do.disableStatsOwner
do.statsOwner = do.newOwnerManager(handle.StatsPrompt, handle.StatsOwnerKey)
do.wg.Run(func() {
do.indexUsageWorker()
}, "indexUsageWorker")
if do.statsLease <= 0 {
// For statsLease > 0, `updateStatsWorker` handles the quit of stats owner.
do.wg.Run(func() { quitStatsOwner(do, owner) }, "quitStatsOwner")
do.wg.Run(func() { quitStatsOwner(do, do.statsOwner) }, "quitStatsOwner")
return nil
}
do.SetStatsUpdating(true)
// The stats updated worker doesn't require the stats initialization to be completed.
// This is because the updated worker's primary responsibilities are to update the change delta and handle DDL operations.
// These tasks do not interfere with or depend on the initialization process.
do.wg.Run(func() { do.updateStatsWorker(ctx, owner) }, "updateStatsWorker")
do.wg.Run(func() { do.updateStatsWorker(ctx) }, "updateStatsWorker")
do.wg.Run(func() {
do.handleDDLEvent()
}, "handleDDLEvent")
Expand All @@ -2339,7 +2342,7 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
case <-do.exit: // It may happen that before initStatsDone, tidb receive Ctrl+C
return
}
do.autoAnalyzeWorker(owner)
do.autoAnalyzeWorker()
},
"autoAnalyzeWorker",
)
Expand All @@ -2350,7 +2353,7 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
case <-do.exit: // It may happen that before initStatsDone, tidb receive Ctrl+C
return
}
do.analyzeJobsCleanupWorker(owner)
do.analyzeJobsCleanupWorker()
},
"analyzeJobsCleanupWorker",
)
Expand Down Expand Up @@ -2380,6 +2383,25 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
return nil
}

// enableStatsOwner enables this node to execute stats owner jobs.
// Since ownerManager.CampaignOwner will start a new goroutine to run ownerManager.campaignLoop,
// we should make sure that before invoking enableStatsOwner(), stats owner is DISABLE.
func (do *Domain) enableStatsOwner() error {
if !do.statsOwner.IsOwner() {
err := do.statsOwner.CampaignOwner()
return errors.Trace(err)
}
return nil
}

// disableStatsOwner disable this node to execute stats owner.
// We should make sure that before invoking disableStatsOwner(), stats owner is ENABLE.
func (do *Domain) disableStatsOwner() error {
// disable campaign by interrupting campaignLoop
do.statsOwner.CampaignCancel()
return nil
}

func quitStatsOwner(do *Domain, mgr owner.Manager) {
<-do.exit
mgr.Cancel()
Expand All @@ -2404,9 +2426,11 @@ func (do *Domain) newOwnerManager(prompt, ownerKey string) owner.Manager {
statsOwner = owner.NewOwnerManager(context.Background(), do.etcdClient, prompt, id, ownerKey)
}
// TODO: Need to do something when err is not nil.
err := statsOwner.CampaignOwner()
if err != nil {
logutil.BgLogger().Warn("campaign owner failed", zap.Error(err))
if ownerKey == handle.StatsOwnerKey && config.GetGlobalConfig().Instance.TiDBEnableStatsOwner.Load() {
err := statsOwner.CampaignOwner()
if err != nil {
logutil.BgLogger().Warn("campaign owner failed", zap.Error(err))
}
}
return statsOwner
}
Expand Down Expand Up @@ -2493,15 +2517,15 @@ func (do *Domain) indexUsageWorker() {
}
}

func (*Domain) updateStatsWorkerExitPreprocessing(statsHandle *handle.Handle, owner owner.Manager) {
func (do *Domain) updateStatsWorkerExitPreprocessing(statsHandle *handle.Handle) {
ch := make(chan struct{}, 1)
timeout, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go func() {
logutil.BgLogger().Info("updateStatsWorker is going to exit, start to flush stats")
statsHandle.FlushStats()
logutil.BgLogger().Info("updateStatsWorker ready to release owner")
owner.Cancel()
do.statsOwner.Cancel()
ch <- struct{}{}
}()
select {
Expand Down Expand Up @@ -2532,7 +2556,7 @@ func (do *Domain) handleDDLEvent() {
}
}

func (do *Domain) updateStatsWorker(_ sessionctx.Context, owner owner.Manager) {
func (do *Domain) updateStatsWorker(_ sessionctx.Context) {
defer util.Recover(metrics.LabelDomain, "updateStatsWorker", nil, false)
logutil.BgLogger().Info("updateStatsWorker started.")
lease := do.statsLease
Expand All @@ -2558,15 +2582,15 @@ func (do *Domain) updateStatsWorker(_ sessionctx.Context, owner owner.Manager) {
for {
select {
case <-do.exit:
do.updateStatsWorkerExitPreprocessing(statsHandle, owner)
do.updateStatsWorkerExitPreprocessing(statsHandle)
return
case <-deltaUpdateTicker.C:
err := statsHandle.DumpStatsDeltaToKV(false)
if err != nil {
logutil.BgLogger().Debug("dump stats delta failed", zap.Error(err))
}
case <-gcStatsTicker.C:
if !owner.IsOwner() {
if !do.statsOwner.IsOwner() {
continue
}
err := statsHandle.GCStats(do.InfoSchema(), do.GetSchemaLease())
Expand All @@ -2587,7 +2611,7 @@ func (do *Domain) updateStatsWorker(_ sessionctx.Context, owner owner.Manager) {
}
}

func (do *Domain) autoAnalyzeWorker(owner owner.Manager) {
func (do *Domain) autoAnalyzeWorker() {
defer util.Recover(metrics.LabelDomain, "autoAnalyzeWorker", nil, false)
statsHandle := do.StatsHandle()
analyzeTicker := time.NewTicker(do.statsLease)
Expand All @@ -2598,7 +2622,7 @@ func (do *Domain) autoAnalyzeWorker(owner owner.Manager) {
for {
select {
case <-analyzeTicker.C:
if variable.RunAutoAnalyze.Load() && !do.stopAutoAnalyze.Load() && owner.IsOwner() {
if variable.RunAutoAnalyze.Load() && !do.stopAutoAnalyze.Load() && do.statsOwner.IsOwner() {
statsHandle.HandleAutoAnalyze()
}
case <-do.exit:
Expand All @@ -2621,7 +2645,7 @@ func (do *Domain) autoAnalyzeWorker(owner owner.Manager) {
// It first retrieves the list of current analyze processes, then removes any analyze job
// that is not associated with a current process. Additionally, if the current instance is the owner,
// it also cleans up corrupted analyze jobs on dead instances.
func (do *Domain) analyzeJobsCleanupWorker(owner owner.Manager) {
func (do *Domain) analyzeJobsCleanupWorker() {
defer util.Recover(metrics.LabelDomain, "analyzeJobsCleanupWorker", nil, false)
// For GC.
const gcInterval = time.Hour
Expand All @@ -2642,7 +2666,7 @@ func (do *Domain) analyzeJobsCleanupWorker(owner owner.Manager) {
select {
case <-gcTicker.C:
// Only the owner should perform this operation.
if owner.IsOwner() {
if do.statsOwner.IsOwner() {
updateTime := time.Now().AddDate(0, 0, -daysToKeep)
err := statsHandle.DeleteAnalyzeJobs(updateTime)
if err != nil {
Expand All @@ -2666,7 +2690,7 @@ func (do *Domain) analyzeJobsCleanupWorker(owner owner.Manager) {
logutil.BgLogger().Warn("cleanup analyze jobs on current instance failed", zap.Error(err))
}

if owner.IsOwner() {
if do.statsOwner.IsOwner() {
err = statsHandle.CleanupCorruptedAnalyzeJobsOnDeadInstances()
if err != nil {
logutil.BgLogger().Warn("cleanup analyze jobs on dead instances failed", zap.Error(err))
Expand Down
4 changes: 2 additions & 2 deletions pkg/domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ func TestStatWorkRecoverFromPanic(t *testing.T) {
metrics.PanicCounter.Reset()
// Since the stats lease is 0 now, so create a new ticker will panic.
// Test that they can recover from panic correctly.
dom.updateStatsWorker(mock.NewContext(), nil)
dom.autoAnalyzeWorker(nil)
dom.updateStatsWorker(mock.NewContext())
dom.autoAnalyzeWorker()
counter := metrics.PanicCounter.WithLabelValues(metrics.LabelDomain)
pb := &dto.Metric{}
err = counter.Write(pb)
Expand Down
18 changes: 18 additions & 0 deletions pkg/sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,22 @@ var defaultSysVars = []*SysVar{
return BoolToOnOff(config.GetGlobalConfig().Instance.TiDBEnableDDL.Load()), nil
},
},
{Scope: ScopeInstance, Name: TiDBEnableStatsOwner, Value: BoolToOnOff(config.GetGlobalConfig().Instance.TiDBEnableStatsOwner.Load()), Type: TypeBool,
SetGlobal: func(_ context.Context, s *SessionVars, val string) error {
oldVal, newVal := config.GetGlobalConfig().Instance.TiDBEnableStatsOwner.Load(), TiDBOptOn(val)
if oldVal != newVal {
err := switchStats(newVal)
if err != nil {
return err
}
config.GetGlobalConfig().Instance.TiDBEnableStatsOwner.Store(newVal)
}
return nil
},
GetGlobal: func(_ context.Context, s *SessionVars) (string, error) {
return BoolToOnOff(config.GetGlobalConfig().Instance.TiDBEnableStatsOwner.Load()), nil
},
},
{Scope: ScopeInstance, Name: TiDBRCReadCheckTS, Value: BoolToOnOff(DefRCReadCheckTS), Type: TypeBool, SetGlobal: func(_ context.Context, s *SessionVars, val string) error {
EnableRCReadCheckTS.Store(TiDBOptOn(val))
return nil
Expand Down Expand Up @@ -3576,6 +3592,8 @@ const (
PluginLoad = "plugin_load"
// TiDBEnableDDL indicates whether the tidb-server campaigns the DDL owner,
TiDBEnableDDL = "tidb_enable_ddl"
// TiDBEnableStatsOwner indicates whether the tidb-server campaigns the Stats owner,
TiDBEnableStatsOwner = "tidb_enable_stats_owner"
// Port is the name for 'port' system variable.
Port = "port"
// DataDir is the name for 'datadir' system variable.
Expand Down
4 changes: 4 additions & 0 deletions pkg/sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -1716,6 +1716,10 @@ var (
SetLowResolutionTSOUpdateInterval func(interval time.Duration) error = nil
// ChangeSchemaCacheSize is called when tidb_schema_cache_size is changed.
ChangeSchemaCacheSize func(ctx context.Context, size uint64) error
// EnableStatsOwner is the func registered by stats to enable running stats in this instance.
EnableStatsOwner func() error = nil
// DisableStatsOwner is the func registered by stats to disable running stats in this instance.
DisableStatsOwner func() error = nil
)

// Hooks functions for Cluster Resource Control.
Expand Down
10 changes: 10 additions & 0 deletions pkg/sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,16 @@ func switchDDL(on bool) error {
return nil
}

// switchStats turns on/off stats owner in an instance
func switchStats(on bool) error {
if on && EnableStatsOwner != nil {
return EnableStatsOwner()
} else if !on && DisableStatsOwner != nil {
return DisableStatsOwner()
}
return nil
}

func collectAllowFuncName4ExpressionIndex() string {
str := make([]string, 0, len(GAFunction4ExpressionIndex))
for funcName := range GAFunction4ExpressionIndex {
Expand Down

0 comments on commit c56694c

Please sign in to comment.