From 87c72eacc2fea09722bbaaefbe1b40eaae988a75 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 23 Dec 2020 17:04:44 -0700 Subject: [PATCH 01/15] *: Move tikv gc configuration to sysvars --- session/bootstrap.go | 71 +++++++ sessionctx/variable/sysvar.go | 28 +++ sessionctx/variable/tidb_vars.go | 17 ++ store/tikv/gcworker/gc_worker.go | 281 ++++++-------------------- store/tikv/gcworker/gc_worker_test.go | 175 ++++++++-------- util/gcutil/gcutil.go | 20 +- 6 files changed, 280 insertions(+), 312 deletions(-) mode change 100644 => 100755 store/tikv/gcworker/gc_worker.go diff --git a/session/bootstrap.go b/session/bootstrap.go index dd5ae8b8bf941..856fecb069041 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -444,6 +444,8 @@ const ( version57 = 57 // version58 add `Repl_client_priv` and `Repl_slave_priv` to `mysql.user` version58 = 58 + // version59 moves TIKV gc settings to sysvars. + version59 = 59 ) var ( @@ -506,6 +508,7 @@ var ( upgradeToVer56, upgradeToVer57, upgradeToVer58, + upgradeToVer59, } ) @@ -1264,6 +1267,74 @@ func upgradeToVer58(s Session, ver int64) { mustExecute(s, "UPDATE HIGH_PRIORITY mysql.user SET Repl_slave_priv='Y',Repl_client_priv='Y'") } +func upgradeToVer59(s Session, ver int64) { + if ver >= version59 { + return + } + + inList := "('tikv_gc_enable', 'tikv_gc_run_interval', 'tikv_gc_life_time', 'tikv_gc_concurrency', 'tikv_gc_mode', 'tikv_gc_scan_lock_mode', 'tikv_gc_auto_concurrency')" + selectSQL := fmt.Sprintf(`SELECT HIGH_PRIORITY VARIABLE_NAME, VARIABLE_VALUE FROM mysql.tidb WHERE variable_name IN %s ORDER BY variable_name DESC`, inList) + ctx := context.Background() + rs, err := s.Execute(ctx, selectSQL) + terror.MustNil(err) + r := rs[0] + defer terror.Call(r.Close) + req := r.NewChunk() + it := chunk.NewIterator4Chunk(req) + err = r.Next(ctx, req) + for err == nil && req.NumRows() != 0 { + for row := it.Begin(); row != it.End(); row = it.Next() { + n := strings.ToLower(row.GetString(0)) + v := row.GetString(1) + switch n { + case "tikv_gc_enable": + if e := s.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCEnable, boolVal(v)); e != nil { + logutil.BgLogger().Warn("could not upgrade sysvar", zap.Error(e)) + } + case "tikv_gc_run_interval": + if e := s.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCRunInterval, v); e != nil { + logutil.BgLogger().Warn("could not upgrade sysvar", zap.Error(e)) + } + case "tikv_gc_life_time": + if e := s.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCLifetime, v); e != nil { + logutil.BgLogger().Warn("could not upgrade sysvar", zap.Error(e)) + } + case "tikv_gc_concurrency": + if e := s.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCConcurrency, v); e != nil { + logutil.BgLogger().Warn("could not upgrade sysvar", zap.Error(e)) + } + case "tikv_gc_mode": + if e := s.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCMode, v); e != nil { + logutil.BgLogger().Warn("could not upgrade sysvar", zap.Error(e)) + } + case "tikv_gc_scan_lock_mode": + if e := s.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCScanLockMode, v); e != nil { + logutil.BgLogger().Warn("could not upgrade sysvar", zap.Error(e)) + } + case "tikv_gc_auto_concurrency": // handled last, overwrites tikv_gc_concurrency + if strings.EqualFold(v, "true") { + if e := s.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCConcurrency, "-1"); e != nil { + logutil.BgLogger().Warn("could not upgrade sysvar", zap.Error(e)) + } + } + } + } + err = r.Next(ctx, req) + } + terror.MustNil(err) + // Remove from the status table. + updateSQL := fmt.Sprintf("DELETE FROM mysql.tidb WHERE variable_name IN %s", inList) + mustExecute(s, updateSQL) + +} + +func boolVal(str string) string { + if strings.EqualFold(str, "false") { + return "OFF" + } + return "ON" +} + func writeMemoryQuotaQuery(s Session) { comment := "memory_quota_query is 32GB by default in v3.0.x, 1GB by default in v4.0.x" sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES ("%s", '%d', '%s') ON DUPLICATE KEY UPDATE VARIABLE_VALUE='%d'`, diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 7b65bb3d06867..18252539e53c5 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -62,6 +62,8 @@ const ( TypeUnsigned TypeFlag = 5 // TypeTime for time of day (a TiDB extension) TypeTime TypeFlag = 6 + // TypeDuration for a golang duration (a TiDB extension) + TypeDuration TypeFlag = 7 // BoolOff is the canonical string representation of a boolean false. BoolOff = "OFF" @@ -134,6 +136,8 @@ func (sv *SysVar) ValidateFromType(vars *SessionVars, value string, scope ScopeF return sv.checkEnumSystemVar(value, vars) case TypeTime: return sv.checkTimeSystemVar(value, vars) + case TypeDuration: + return sv.checkDurationSystemVar(value, vars) } return value, nil // typeString } @@ -158,6 +162,22 @@ func (sv *SysVar) checkTimeSystemVar(value string, vars *SessionVars) (string, e return t.Format(FullDayTimeFormat), nil } +func (sv *SysVar) checkDurationSystemVar(value string, vars *SessionVars) (string, error) { + d, err := time.ParseDuration(value) + if err != nil { + return value, ErrWrongTypeForVar.GenWithStackByArgs(sv.Name) + } + // Check for min/max violations + if int64(d) < sv.MinValue { + return value, ErrWrongTypeForVar.GenWithStackByArgs(sv.Name) + } + if uint64(d) > sv.MaxValue { + return value, ErrWrongTypeForVar.GenWithStackByArgs(sv.Name) + } + // return a string representation of the duration + return d.String(), nil +} + func (sv *SysVar) checkUInt64SystemVar(value string, vars *SessionVars) (string, error) { if sv.AllowAutoValue && value == "-1" { return value, nil @@ -1190,6 +1210,14 @@ var defaultSysVars = []*SysVar{ {Scope: ScopeGlobal | ScopeSession, Name: TiDBAnalyzeVersion, Value: strconv.Itoa(DefTiDBAnalyzeVersion), Type: TypeInt, MinValue: 1, MaxValue: 2}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableIndexMergeJoin, Value: BoolToOnOff(DefTiDBEnableIndexMergeJoin), Type: TypeBool}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBTrackAggregateMemoryUsage, Value: BoolToOnOff(DefTiDBTrackAggregateMemoryUsage), Type: TypeBool}, + + /* tikv gc metrics */ + {Scope: ScopeGlobal, Name: TiKVGCEnable, Value: BoolOn, Type: TypeBool}, + {Scope: ScopeGlobal, Name: TiKVGCRunInterval, Value: "10m0s", Type: TypeDuration, MinValue: 0, MaxValue: math.MaxInt64}, + {Scope: ScopeGlobal, Name: TiKVGCLifetime, Value: "10m0s", Type: TypeDuration, MinValue: int64(time.Minute * 10), MaxValue: math.MaxInt64}, + {Scope: ScopeGlobal, Name: TiKVGCConcurrency, Value: "-1", Type: TypeInt, MinValue: 1, MaxValue: 128, AllowAutoValue: true}, + {Scope: ScopeGlobal, Name: TiKVGCMode, Value: "DISTRIBUTED", Type: TypeEnum, PossibleValues: []string{"DISTRIBUTED", "CENTRAL"}}, + {Scope: ScopeGlobal, Name: TiKVGCScanLockMode, Value: "PHYSICAL", Type: TypeEnum, PossibleValues: []string{"PHYSICAL", "LEGACY"}}, } // SynonymsSysVariables is synonyms of system variables. diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 84216c32a913a..1a912ec162876 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -509,6 +509,23 @@ const ( TiDBTrackAggregateMemoryUsage = "tidb_track_aggregate_memory_usage" ) +// TiDB vars that have only global scope + +const ( + // TiKVGCEnable description + TiKVGCEnable = "tikv_gc_enable" + // TiKVGCRunInterval description + TiKVGCRunInterval = "tikv_gc_run_interval" + // TiKVGCLifetime description + TiKVGCLifetime = "tikv_gc_life_time" + // TiKVGCConcurrency description + TiKVGCConcurrency = "tikv_gc_concurrency" + // TiKVGCMode description + TiKVGCMode = "tikv_gc_mode" + // TiKVGCScanLockMode description + TiKVGCScanLockMode = "tikv_gc_scan_lock_mode" +) + // Default TiDB system variable values. const ( DefHostname = "localhost" diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go old mode 100644 new mode 100755 index 6a030cda6832e..a71ae4ca4adf1 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -23,7 +23,6 @@ import ( "os" "sort" "strconv" - "strings" "sync" "time" @@ -110,63 +109,27 @@ const ( booleanTrue = "true" booleanFalse = "false" - gcWorkerTickInterval = time.Minute - gcWorkerLease = time.Minute * 2 - gcLeaderUUIDKey = "tikv_gc_leader_uuid" - gcLeaderDescKey = "tikv_gc_leader_desc" - gcLeaderLeaseKey = "tikv_gc_leader_lease" - - gcLastRunTimeKey = "tikv_gc_last_run_time" - gcRunIntervalKey = "tikv_gc_run_interval" - gcDefaultRunInterval = time.Minute * 10 - gcWaitTime = time.Minute * 1 - gcRedoDeleteRangeDelay = 24 * time.Hour - - gcLifeTimeKey = "tikv_gc_life_time" - gcDefaultLifeTime = time.Minute * 10 - gcMinLifeTime = time.Minute * 10 - gcSafePointKey = "tikv_gc_safe_point" - gcConcurrencyKey = "tikv_gc_concurrency" - gcDefaultConcurrency = 2 - gcMinConcurrency = 1 - gcMaxConcurrency = 128 - // We don't want gc to sweep out the cached info belong to other processes, like coprocessor. - gcScanLockLimit = tikv.ResolvedCacheSize / 2 - - gcEnableKey = "tikv_gc_enable" - gcDefaultEnableValue = true - - gcModeKey = "tikv_gc_mode" - gcModeCentral = "central" - gcModeDistributed = "distributed" - gcModeDefault = gcModeDistributed - - gcScanLockModeKey = "tikv_gc_scan_lock_mode" - gcScanLockModeLegacy = "legacy" - gcScanLockModePhysical = "physical" - gcScanLockModeDefault = gcScanLockModePhysical - - gcAutoConcurrencyKey = "tikv_gc_auto_concurrency" - gcDefaultAutoConcurrency = true - + gcWorkerTickInterval = time.Minute + gcWorkerLease = time.Minute * 2 + gcLeaderUUIDKey = "tikv_gc_leader_uuid" + gcLeaderDescKey = "tikv_gc_leader_desc" + gcLeaderLeaseKey = "tikv_gc_leader_lease" + gcLastRunTimeKey = "tikv_gc_last_run_time" + gcWaitTime = time.Minute * 1 + gcRedoDeleteRangeDelay = 24 * time.Hour + gcSafePointKey = "tikv_gc_safe_point" gcWorkerServiceSafePointID = "gc_worker" + gcScanLockLimit = tikv.ResolvedCacheSize / 2 ) var gcSafePointCacheInterval = tikv.GcSafePointCacheInterval var gcVariableComments = map[string]string{ - gcLeaderUUIDKey: "Current GC worker leader UUID. (DO NOT EDIT)", - gcLeaderDescKey: "Host name and pid of current GC leader. (DO NOT EDIT)", - gcLeaderLeaseKey: "Current GC worker leader lease. (DO NOT EDIT)", - gcLastRunTimeKey: "The time when last GC starts. (DO NOT EDIT)", - gcRunIntervalKey: "GC run interval, at least 10m, in Go format.", - gcLifeTimeKey: "All versions within life time will not be collected by GC, at least 10m, in Go format.", - gcSafePointKey: "All versions after safe point can be accessed. (DO NOT EDIT)", - gcConcurrencyKey: "How many goroutines used to do GC parallel, [1, 128], default 2", - gcEnableKey: "Current GC enable status", - gcModeKey: "Mode of GC, \"central\" or \"distributed\"", - gcAutoConcurrencyKey: "Let TiDB pick the concurrency automatically. If set false, tikv_gc_concurrency will be used", - gcScanLockModeKey: "Mode of scanning locks, \"physical\" or \"legacy\"", + gcLeaderUUIDKey: "Current GC worker leader UUID. (DO NOT EDIT)", + gcLeaderDescKey: "Host name and pid of current GC leader. (DO NOT EDIT)", + gcLeaderLeaseKey: "Current GC worker leader lease. (DO NOT EDIT)", + gcLastRunTimeKey: "The time when last GC starts. (DO NOT EDIT)", + gcSafePointKey: "All versions after safe point can be accessed. (DO NOT EDIT)", } func (w *GCWorker) start(ctx context.Context, wg *sync.WaitGroup) { @@ -232,8 +195,8 @@ func (w *GCWorker) tick(ctx context.Context) { } } else { // Config metrics should always be updated by leader, set them to 0 when current instance is not leader. - metrics.GCConfigGauge.WithLabelValues(gcRunIntervalKey).Set(0) - metrics.GCConfigGauge.WithLabelValues(gcLifeTimeKey).Set(0) + metrics.GCConfigGauge.WithLabelValues(variable.TiKVGCRunInterval).Set(0) + metrics.GCConfigGauge.WithLabelValues(variable.TiKVGCLifetime).Set(0) } } @@ -308,11 +271,13 @@ func (w *GCWorker) prepare() (bool, uint64, error) { } func (w *GCWorker) checkPrepare(ctx context.Context) (bool, uint64, error) { - enable, err := w.checkGCEnable() + se := createSession(w.store) + defer se.Close() + enable, err := se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCEnable) if err != nil { return false, 0, errors.Trace(err) } - if !enable { + if !variable.TiDBOptOn(enable) { logutil.Logger(ctx).Warn("[gc worker] gc status is disabled.") return false, 0, nil } @@ -380,77 +345,42 @@ func (w *GCWorker) getOracleTime() (time.Time, error) { return time.Unix(sec, nsec), nil } -func (w *GCWorker) checkGCEnable() (bool, error) { - return w.loadBooleanWithDefault(gcEnableKey, gcDefaultEnableValue) -} - -func (w *GCWorker) checkUseAutoConcurrency() (bool, error) { - return w.loadBooleanWithDefault(gcAutoConcurrencyKey, gcDefaultAutoConcurrency) -} - -func (w *GCWorker) loadBooleanWithDefault(key string, defaultValue bool) (bool, error) { - str, err := w.loadValueFromSysTable(key) - if err != nil { - return false, errors.Trace(err) - } - if str == "" { - // Save default value for gc enable key. The default value is always true. - defaultValueStr := booleanFalse - if defaultValue { - defaultValueStr = booleanTrue - } - err = w.saveValueToSysTable(key, defaultValueStr) - if err != nil { - return defaultValue, errors.Trace(err) - } - return defaultValue, nil - } - return strings.EqualFold(str, booleanTrue), nil -} - func (w *GCWorker) getGCConcurrency(ctx context.Context) (int, error) { - useAutoConcurrency, err := w.checkUseAutoConcurrency() + se := createSession(w.store) + defer se.Close() + concurrencyStr, _ := se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCConcurrency) + concurrency, err := strconv.ParseInt(concurrencyStr, 10, 32) if err != nil { - logutil.Logger(ctx).Error("[gc worker] failed to load config gc_auto_concurrency. use default value.", - zap.String("uuid", w.uuid), - zap.Error(err)) - useAutoConcurrency = gcDefaultAutoConcurrency + logutil.Logger(ctx).Error("[gc worker] could not convert TiKVGCConcurrency!", + zap.String("concurrency", concurrencyStr)) + return 2, nil // using previous default } - if !useAutoConcurrency { - return w.loadGCConcurrencyWithDefault() + if concurrency != -1 { // use an explicit value + return int(concurrency), nil } - stores, err := w.getStoresForGC(ctx) - concurrency := len(stores) - if err != nil { - logutil.Logger(ctx).Error("[gc worker] failed to get up stores to calculate concurrency. use config.", - zap.String("uuid", w.uuid), - zap.Error(err)) - - concurrency, err = w.loadGCConcurrencyWithDefault() - if err != nil { - logutil.Logger(ctx).Error("[gc worker] failed to load gc concurrency from config. use default value.", - zap.String("uuid", w.uuid), - zap.Error(err)) - concurrency = gcDefaultConcurrency + // Calculate concurrency from an auto value (-1) + if stores, err := w.getStoresForGC(ctx); err == nil { + autoConcurrency := len(stores) + if autoConcurrency == 0 { + logutil.Logger(ctx).Error("[gc worker] no store is up", + zap.String("uuid", w.uuid)) + return 0, errors.New("[gc worker] no store is up") } + return autoConcurrency, nil } - - if concurrency == 0 { - logutil.Logger(ctx).Error("[gc worker] no store is up", - zap.String("uuid", w.uuid)) - return 0, errors.New("[gc worker] no store is up") - } - - return concurrency, nil + return 0, errors.New("[gc worker] Could not getStoresForGC") } func (w *GCWorker) checkGCInterval(now time.Time) (bool, error) { - runInterval, err := w.loadDurationWithDefault(gcRunIntervalKey, gcDefaultRunInterval) - if err != nil { + se := createSession(w.store) + defer se.Close() + runIntervalStr, err := se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCRunInterval) + runInterval, err2 := strToDuration(runIntervalStr) + if err != nil || err2 != nil { return false, errors.Trace(err) } - metrics.GCConfigGauge.WithLabelValues(gcRunIntervalKey).Set(runInterval.Seconds()) + metrics.GCConfigGauge.WithLabelValues(variable.TiKVGCRunInterval).Set(runInterval.Seconds()) lastRun, err := w.loadTime(gcLastRunTimeKey) if err != nil { return false, errors.Trace(err) @@ -467,30 +397,16 @@ func (w *GCWorker) checkGCInterval(now time.Time) (bool, error) { return true, nil } -// validateGCLifeTime checks whether life time is small than min gc life time. -func (w *GCWorker) validateGCLifeTime(lifeTime time.Duration) (time.Duration, error) { - if lifeTime >= gcMinLifeTime { - return lifeTime, nil - } - - logutil.BgLogger().Info("[gc worker] invalid gc life time", - zap.Duration("get gc life time", lifeTime), - zap.Duration("min gc life time", gcMinLifeTime)) - - err := w.saveDuration(gcLifeTimeKey, gcMinLifeTime) - return gcMinLifeTime, err -} - func (w *GCWorker) calculateNewSafePoint(ctx context.Context, now time.Time) (*time.Time, uint64, error) { - lifeTime, err := w.loadDurationWithDefault(gcLifeTimeKey, gcDefaultLifeTime) - if err != nil { + se := createSession(w.store) + defer se.Close() + lifeTimeStr, err := se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCLifetime) + lifeTime, err2 := strToDuration(lifeTimeStr) + if err != nil || err2 != nil { return nil, 0, errors.Trace(err) } - *lifeTime, err = w.validateGCLifeTime(*lifeTime) - if err != nil { - return nil, 0, err - } - metrics.GCConfigGauge.WithLabelValues(gcLifeTimeKey).Set(lifeTime.Seconds()) + + metrics.GCConfigGauge.WithLabelValues(variable.TiKVGCLifetime).Set(lifeTime.Seconds()) lastSafePoint, err := w.loadTime(gcSafePointKey) if err != nil { return nil, 0, errors.Trace(err) @@ -880,78 +796,31 @@ func (w *GCWorker) getStoresMapForGC(ctx context.Context) (map[uint64]*metapb.St return storesMap, nil } -func (w *GCWorker) loadGCConcurrencyWithDefault() (int, error) { - str, err := w.loadValueFromSysTable(gcConcurrencyKey) - if err != nil { - return gcDefaultConcurrency, errors.Trace(err) - } - if str == "" { - err = w.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(gcDefaultConcurrency)) - if err != nil { - return gcDefaultConcurrency, errors.Trace(err) - } - return gcDefaultConcurrency, nil - } - - jobConcurrency, err := strconv.Atoi(str) - if err != nil { - return gcDefaultConcurrency, err - } - - if jobConcurrency < gcMinConcurrency { - jobConcurrency = gcMinConcurrency - } - - if jobConcurrency > gcMaxConcurrency { - jobConcurrency = gcMaxConcurrency - } - - return jobConcurrency, nil -} - func (w *GCWorker) checkUseDistributedGC() (bool, error) { - str, err := w.loadValueFromSysTable(gcModeKey) + se := createSession(w.store) + defer se.Close() + str, err := se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCMode) if err != nil { return false, errors.Trace(err) } - if str == "" { - err = w.saveValueToSysTable(gcModeKey, gcModeDefault) - if err != nil { - return false, errors.Trace(err) - } - str = gcModeDefault - } - if strings.EqualFold(str, gcModeDistributed) { - return true, nil - } - if strings.EqualFold(str, gcModeCentral) { + // str is validated by the sysVar system, it can only be 'DISTRIBUTED' (default) or 'CENTRAL' + if str == "CENTRAL" { return false, nil } - logutil.BgLogger().Warn("[gc worker] distributed mode will be used", - zap.String("invalid gc mode", str)) return true, nil } func (w *GCWorker) checkUsePhysicalScanLock() (bool, error) { - str, err := w.loadValueFromSysTable(gcScanLockModeKey) + se := createSession(w.store) + defer se.Close() + str, err := se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCScanLockMode) if err != nil { return false, errors.Trace(err) } - if str == "" { - err = w.saveValueToSysTable(gcScanLockModeKey, gcScanLockModeDefault) - if err != nil { - return false, errors.Trace(err) - } - str = gcScanLockModeDefault - } - if strings.EqualFold(str, gcScanLockModePhysical) { + // str is validated by sysVar system, it will be PHYSICAL or LEGACY (default) + if str == "PHYSICAL" { return true, nil } - if strings.EqualFold(str, gcScanLockModeLegacy) { - return false, nil - } - logutil.BgLogger().Warn("[gc worker] legacy scan lock mode will be used", - zap.String("invalid scan lock mode", str)) return false, nil } @@ -1710,16 +1579,7 @@ func (w *GCWorker) loadTime(key string) (*time.Time, error) { return &t, nil } -func (w *GCWorker) saveDuration(key string, d time.Duration) error { - err := w.saveValueToSysTable(key, d.String()) - return errors.Trace(err) -} - -func (w *GCWorker) loadDuration(key string) (*time.Duration, error) { - str, err := w.loadValueFromSysTable(key) - if err != nil { - return nil, errors.Trace(err) - } +func strToDuration(str string) (*time.Duration, error) { if str == "" { return nil, nil } @@ -1730,21 +1590,6 @@ func (w *GCWorker) loadDuration(key string) (*time.Duration, error) { return &d, nil } -func (w *GCWorker) loadDurationWithDefault(key string, def time.Duration) (*time.Duration, error) { - d, err := w.loadDuration(key) - if err != nil { - return nil, errors.Trace(err) - } - if d == nil { - err = w.saveDuration(key, def) - if err != nil { - return nil, errors.Trace(err) - } - return &def, nil - } - return d, nil -} - func (w *GCWorker) loadValueFromSysTable(key string) (string, error) { ctx := context.Background() se := createSession(w.store) diff --git a/store/tikv/gcworker/gc_worker_test.go b/store/tikv/gcworker/gc_worker_test.go index 11b981aaa1758..d6449b65d5dde 100644 --- a/store/tikv/gcworker/gc_worker_test.go +++ b/store/tikv/gcworker/gc_worker_test.go @@ -271,10 +271,17 @@ func (s *testGCWorkerSuite) TestPrepareGC(c *C) { c.Assert(lastRun, NotNil) safePoint, err := s.gcWorker.loadTime(gcSafePointKey) c.Assert(err, IsNil) - s.timeEqual(c, safePoint.Add(gcDefaultLifeTime), now, 2*time.Second) + + se := createSession(s.gcWorker.store) + defer se.Close() + + gcDefaultLifeTime, err := strToDuration("10m0s") // variable.TiKVGCLifetime default + c.Assert(err, IsNil) + s.timeEqual(c, safePoint.Add(*gcDefaultLifeTime), now, 2*time.Second) // Change GC run interval. - err = s.gcWorker.saveDuration(gcRunIntervalKey, time.Minute*5) + t := time.Minute * 5 + err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCRunInterval, t.String()) c.Assert(err, IsNil) s.oracle.AddOffset(time.Minute * 4) ok, _, err = s.gcWorker.prepare() @@ -286,7 +293,8 @@ func (s *testGCWorkerSuite) TestPrepareGC(c *C) { c.Assert(ok, IsTrue) // Change GC life time. - err = s.gcWorker.saveDuration(gcLifeTimeKey, time.Minute*30) + t = time.Minute * 30 + err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCLifetime, t.String()) c.Assert(err, IsNil) s.oracle.AddOffset(time.Minute * 5) ok, _, err = s.gcWorker.prepare() @@ -303,73 +311,73 @@ func (s *testGCWorkerSuite) TestPrepareGC(c *C) { s.timeEqual(c, safePoint.Add(time.Minute*30), now, 2*time.Second) // Change GC concurrency. - concurrency, err := s.gcWorker.loadGCConcurrencyWithDefault() - c.Assert(err, IsNil) - c.Assert(concurrency, Equals, gcDefaultConcurrency) - - err = s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(gcMinConcurrency)) + ctx := context.Background() + concurrencyStr, err := se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCConcurrency) c.Assert(err, IsNil) - concurrency, err = s.gcWorker.loadGCConcurrencyWithDefault() + i64, err := strconv.ParseInt(concurrencyStr, 10, 32) c.Assert(err, IsNil) - c.Assert(concurrency, Equals, gcMinConcurrency) + concurrency := int(i64) + c.Assert(concurrency, Equals, -1) // variable.TiKVGCConcurrency default + autoConcurrencyFinalValue, err := s.gcWorker.getGCConcurrency(ctx) + c.Assert(autoConcurrencyFinalValue > 0, Equals, true) - err = s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(-1)) + err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCConcurrency, "1") // set to the min non auto value. c.Assert(err, IsNil) - concurrency, err = s.gcWorker.loadGCConcurrencyWithDefault() + concurrency, err = s.gcWorker.getGCConcurrency(ctx) c.Assert(err, IsNil) - c.Assert(concurrency, Equals, gcMinConcurrency) + c.Assert(concurrency, Equals, 1) // the value just set. - err = s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(1000000)) + err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCConcurrency, "-2") // invalid option + c.Assert(err, NotNil) + concurrency, err = s.gcWorker.getGCConcurrency(ctx) c.Assert(err, IsNil) - concurrency, err = s.gcWorker.loadGCConcurrencyWithDefault() + c.Assert(concurrency, Equals, 1) // the previous value. + + err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCConcurrency, strconv.Itoa(1000000)) // invalid option + c.Assert(err, NotNil) + concurrency, err = s.gcWorker.getGCConcurrency(ctx) c.Assert(err, IsNil) - c.Assert(concurrency, Equals, gcMaxConcurrency) + c.Assert(concurrency, Equals, 1) // the previous value. // Change GC enable status. s.oracle.AddOffset(time.Minute * 40) - err = s.gcWorker.saveValueToSysTable(gcEnableKey, booleanFalse) + err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCEnable, variable.BoolOff) c.Assert(err, IsNil) ok, _, err = s.gcWorker.prepare() c.Assert(err, IsNil) c.Assert(ok, IsFalse) - err = s.gcWorker.saveValueToSysTable(gcEnableKey, booleanTrue) + err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCEnable, variable.BoolOn) c.Assert(err, IsNil) ok, _, err = s.gcWorker.prepare() c.Assert(err, IsNil) c.Assert(ok, IsTrue) // Check gc life time small than min. - s.oracle.AddOffset(time.Minute * 40) - err = s.gcWorker.saveDuration(gcLifeTimeKey, time.Minute) + err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCLifetime, time.Minute.String()) + c.Assert(err, NotNil) // one minute is too short. + + t = time.Minute * 10 + err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCLifetime, t.String()) c.Assert(err, IsNil) - ok, _, err = s.gcWorker.prepare() + str, err := se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCLifetime) c.Assert(err, IsNil) - c.Assert(ok, IsTrue) - lifeTime, err := s.gcWorker.loadDuration(gcLifeTimeKey) + lifeTime, err := strToDuration(str) c.Assert(err, IsNil) - c.Assert(*lifeTime, Equals, gcMinLifeTime) + c.Assert(*lifeTime, Equals, t) s.oracle.AddOffset(time.Minute * 40) - err = s.gcWorker.saveDuration(gcLifeTimeKey, time.Minute*30) + t = time.Minute * 30 + err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCLifetime, t.String()) c.Assert(err, IsNil) ok, _, err = s.gcWorker.prepare() c.Assert(err, IsNil) c.Assert(ok, IsTrue) - lifeTime, err = s.gcWorker.loadDuration(gcLifeTimeKey) - c.Assert(err, IsNil) - c.Assert(*lifeTime, Equals, 30*time.Minute) - - // Change auto concurrency - err = s.gcWorker.saveValueToSysTable(gcAutoConcurrencyKey, booleanFalse) - c.Assert(err, IsNil) - useAutoConcurrency, err := s.gcWorker.checkUseAutoConcurrency() + str, err = se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCLifetime) c.Assert(err, IsNil) - c.Assert(useAutoConcurrency, IsFalse) - err = s.gcWorker.saveValueToSysTable(gcAutoConcurrencyKey, booleanTrue) + lifeTime, err = strToDuration(str) c.Assert(err, IsNil) - useAutoConcurrency, err = s.gcWorker.checkUseAutoConcurrency() - c.Assert(err, IsNil) - c.Assert(useAutoConcurrency, IsTrue) + c.Assert(*lifeTime, Equals, t) + } func (s *testGCWorkerSuite) TestDoGCForOneRegion(c *C) { @@ -405,24 +413,18 @@ func (s *testGCWorkerSuite) TestDoGCForOneRegion(c *C) { } func (s *testGCWorkerSuite) TestGetGCConcurrency(c *C) { - // Pick a concurrency that doesn't equal to the number of stores. - concurrencyConfig := 25 - c.Assert(concurrencyConfig, Not(Equals), len(s.cluster.GetAllStores())) - err := s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(concurrencyConfig)) - c.Assert(err, IsNil) - ctx := context.Background() + // Explicitly set the concurrency to -1 (AUTO) and then verify that + // The getGCConcurrency returns the number equal to the number of stores. - err = s.gcWorker.saveValueToSysTable(gcAutoConcurrencyKey, booleanFalse) - c.Assert(err, IsNil) - concurrency, err := s.gcWorker.getGCConcurrency(ctx) - c.Assert(err, IsNil) - c.Assert(concurrency, Equals, concurrencyConfig) + se := createSession(s.gcWorker.store) + defer se.Close() - err = s.gcWorker.saveValueToSysTable(gcAutoConcurrencyKey, booleanTrue) - c.Assert(err, IsNil) - concurrency, err = s.gcWorker.getGCConcurrency(ctx) + err := se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCConcurrency, "-1") c.Assert(err, IsNil) + + ctx := context.Background() + concurrency, err := s.gcWorker.getGCConcurrency(ctx) c.Assert(concurrency, Equals, len(s.cluster.GetAllStores())) } @@ -433,75 +435,83 @@ func (s *testGCWorkerSuite) TestDoGC(c *C) { gcSafePointCacheInterval = 1 p := s.createGCProbe(c, "k1") - err = s.gcWorker.doGC(ctx, s.mustAllocTs(c), gcDefaultConcurrency) + err = s.gcWorker.doGC(ctx, s.mustAllocTs(c), 2) // gcDefaultConcurrency c.Assert(err, IsNil) s.checkCollected(c, p) p = s.createGCProbe(c, "k1") - err = s.gcWorker.doGC(ctx, s.mustAllocTs(c), gcMinConcurrency) + err = s.gcWorker.doGC(ctx, s.mustAllocTs(c), 1) // gcMinConcurrency c.Assert(err, IsNil) s.checkCollected(c, p) p = s.createGCProbe(c, "k1") - err = s.gcWorker.doGC(ctx, s.mustAllocTs(c), gcMaxConcurrency) + err = s.gcWorker.doGC(ctx, s.mustAllocTs(c), 128) // gcMaxConcurrency c.Assert(err, IsNil) s.checkCollected(c, p) } func (s *testGCWorkerSuite) TestCheckGCMode(c *C) { + + se := createSession(s.gcWorker.store) + defer se.Close() + useDistributedGC, err := s.gcWorker.checkUseDistributedGC() c.Assert(err, IsNil) c.Assert(useDistributedGC, Equals, true) // Now the row must be set to the default value. - str, err := s.gcWorker.loadValueFromSysTable(gcModeKey) + str, err := se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCMode) c.Assert(err, IsNil) - c.Assert(str, Equals, gcModeDistributed) + c.Assert(str, Equals, "DISTRIBUTED") - err = s.gcWorker.saveValueToSysTable(gcModeKey, gcModeCentral) + err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCMode, "cenTraL") c.Assert(err, IsNil) useDistributedGC, err = s.gcWorker.checkUseDistributedGC() c.Assert(err, IsNil) c.Assert(useDistributedGC, Equals, false) - err = s.gcWorker.saveValueToSysTable(gcModeKey, gcModeDistributed) + err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCMode, "distribuTed") c.Assert(err, IsNil) useDistributedGC, err = s.gcWorker.checkUseDistributedGC() c.Assert(err, IsNil) c.Assert(useDistributedGC, Equals, true) - err = s.gcWorker.saveValueToSysTable(gcModeKey, "invalid_mode") - c.Assert(err, IsNil) - useDistributedGC, err = s.gcWorker.checkUseDistributedGC() - c.Assert(err, IsNil) - c.Assert(useDistributedGC, Equals, true) + err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCMode, "invalid_mode") + c.Assert(err, NotNil) // won't change the value + str, err = se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCMode) + c.Assert(str, Equals, "DISTRIBUTED") // keeps previous value. + } func (s *testGCWorkerSuite) TestCheckScanLockMode(c *C) { + + se := createSession(s.gcWorker.store) + defer se.Close() + usePhysical, err := s.gcWorker.checkUsePhysicalScanLock() c.Assert(err, IsNil) - c.Assert(usePhysical, Equals, gcScanLockModeDefault == gcScanLockModePhysical) + c.Assert(usePhysical, Equals, true) // Now the row must be set to the default value. - str, err := s.gcWorker.loadValueFromSysTable(gcScanLockModeKey) + str, err := se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCScanLockMode) c.Assert(err, IsNil) - c.Assert(str, Equals, gcScanLockModeDefault) + c.Assert(str, Equals, "PHYSICAL") - err = s.gcWorker.saveValueToSysTable(gcScanLockModeKey, gcScanLockModeLegacy) + err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCScanLockMode, "legaCY") c.Assert(err, IsNil) usePhysical, err = s.gcWorker.checkUsePhysicalScanLock() c.Assert(err, IsNil) c.Assert(usePhysical, Equals, false) - err = s.gcWorker.saveValueToSysTable(gcScanLockModeKey, gcScanLockModePhysical) + err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCScanLockMode, "phySICAL") c.Assert(err, IsNil) usePhysical, err = s.gcWorker.checkUsePhysicalScanLock() c.Assert(err, IsNil) c.Assert(usePhysical, Equals, true) - err = s.gcWorker.saveValueToSysTable(gcScanLockModeKey, "invalid_mode") - c.Assert(err, IsNil) + err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCScanLockMode, "invalid_mode") + c.Assert(err, NotNil) usePhysical, err = s.gcWorker.checkUsePhysicalScanLock() c.Assert(err, IsNil) - c.Assert(usePhysical, Equals, false) + c.Assert(usePhysical, Equals, true) // unchanged } func (s *testGCWorkerSuite) TestNeedsGCOperationForStore(c *C) { @@ -745,14 +755,18 @@ func (c *testGCWorkerClient) SendRequest(ctx context.Context, addr string, req * func (s *testGCWorkerSuite) TestLeaderTick(c *C) { gcSafePointCacheInterval = 0 - veryLong := gcDefaultLifeTime * 10 + veryLong := time.Minute * 10 * 10 // gcDefaultLifeTime * 10 // Avoid failing at interval check. `lastFinish` is checked by os time. s.gcWorker.lastFinish = time.Now().Add(-veryLong) // Use central mode to do this test. - err := s.gcWorker.saveValueToSysTable(gcModeKey, gcModeCentral) + + se := createSession(s.gcWorker.store) + defer se.Close() + + err := se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCMode, "CENTRAL") c.Assert(err, IsNil) p := s.createGCProbe(c, "k1") - s.oracle.AddOffset(gcDefaultLifeTime * 2) + s.oracle.AddOffset(time.Minute * 10 * 2) // gcDefaultLifeTime * 2 // Skip if GC is running. s.gcWorker.gcIsRunning = true @@ -765,12 +779,12 @@ func (s *testGCWorkerSuite) TestLeaderTick(c *C) { c.Assert(err, IsNil) // Skip if prepare failed (disabling GC will make prepare returns ok = false). - err = s.gcWorker.saveValueToSysTable(gcEnableKey, booleanFalse) + err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCEnable, "OFF") c.Assert(err, IsNil) err = s.gcWorker.leaderTick(context.Background()) c.Assert(err, IsNil) s.checkNotCollected(c, p) - err = s.gcWorker.saveValueToSysTable(gcEnableKey, booleanTrue) + err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCEnable, "ON") c.Assert(err, IsNil) // Reset GC last run time err = s.gcWorker.saveTime(gcLastRunTimeKey, oracle.GetTimeFromTS(s.mustAllocTs(c)).Add(-veryLong)) @@ -805,7 +819,7 @@ func (s *testGCWorkerSuite) TestLeaderTick(c *C) { c.Assert(err, IsNil) s.gcWorker.lastFinish = time.Now().Add(-veryLong) p = s.createGCProbe(c, "k1") - s.oracle.AddOffset(gcDefaultLifeTime * 2) + s.oracle.AddOffset(time.Minute * 10 * 2) // gcDefaultLifeTime * 2 err = s.gcWorker.leaderTick(context.Background()) c.Assert(err, IsNil) @@ -897,7 +911,10 @@ func (s *testGCWorkerSuite) TestRunGCJob(c *C) { c.Assert(err, NotNil) // Test central mode - err = s.gcWorker.saveValueToSysTable(gcModeKey, gcModeCentral) + se := createSession(s.gcWorker.store) + defer se.Close() + + err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCMode, "CENTRAL") c.Assert(err, IsNil) useDistributedGC, err = s.gcWorker.checkUseDistributedGC() c.Assert(err, IsNil) diff --git a/util/gcutil/gcutil.go b/util/gcutil/gcutil.go index f265e4dd0603f..6e1d9fdc93bc7 100644 --- a/util/gcutil/gcutil.go +++ b/util/gcutil/gcutil.go @@ -26,36 +26,26 @@ import ( const ( selectVariableValueSQL = `SELECT HIGH_PRIORITY variable_value FROM mysql.tidb WHERE variable_name='%s'` - insertVariableValueSQL = `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s') - ON DUPLICATE KEY - UPDATE variable_value = '%[2]s', comment = '%[3]s'` + insertVariableValueSQL = `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s') ON DUPLICATE KEY UPDATE variable_value = '%[2]s', comment = '%[3]s'` ) // CheckGCEnable is use to check whether GC is enable. func CheckGCEnable(ctx sessionctx.Context) (enable bool, err error) { - sql := fmt.Sprintf(selectVariableValueSQL, "tikv_gc_enable") - rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql) + val, err := ctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCEnable) if err != nil { return false, errors.Trace(err) } - if len(rows) != 1 { - return false, errors.New("can not get 'tikv_gc_enable'") - } - return rows[0].GetString(0) == "true", nil + return variable.TiDBOptOn(val), nil } // DisableGC will disable GC enable variable. func DisableGC(ctx sessionctx.Context) error { - sql := fmt.Sprintf(insertVariableValueSQL, "tikv_gc_enable", "false", "Current GC enable status") - _, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql) - return errors.Trace(err) + return ctx.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCEnable, variable.BoolOff) } // EnableGC will enable GC enable variable. func EnableGC(ctx sessionctx.Context) error { - sql := fmt.Sprintf(insertVariableValueSQL, "tikv_gc_enable", "true", "Current GC enable status") - _, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql) - return errors.Trace(err) + return ctx.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCEnable, variable.BoolOn) } // ValidateSnapshot checks that the newly set snapshot time is after GC safe point time. From 9c4e9cc861161062642fb68fad03ba12ebcc004e Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 23 Dec 2020 17:55:46 -0700 Subject: [PATCH 02/15] fix make check errors --- store/tikv/gcworker/gc_worker.go | 6 +++++- store/tikv/gcworker/gc_worker_test.go | 7 +++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index a71ae4ca4adf1..9c85907ac8d29 100755 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -348,7 +348,11 @@ func (w *GCWorker) getOracleTime() (time.Time, error) { func (w *GCWorker) getGCConcurrency(ctx context.Context) (int, error) { se := createSession(w.store) defer se.Close() - concurrencyStr, _ := se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCConcurrency) + concurrencyStr, err := se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCConcurrency) + if err != nil { + logutil.Logger(ctx).Error("[gc worker] could not fetch tikv_gc_concurrency") + return 2, nil + } concurrency, err := strconv.ParseInt(concurrencyStr, 10, 32) if err != nil { logutil.Logger(ctx).Error("[gc worker] could not convert TiKVGCConcurrency!", diff --git a/store/tikv/gcworker/gc_worker_test.go b/store/tikv/gcworker/gc_worker_test.go index d6449b65d5dde..865642a867408 100644 --- a/store/tikv/gcworker/gc_worker_test.go +++ b/store/tikv/gcworker/gc_worker_test.go @@ -318,7 +318,8 @@ func (s *testGCWorkerSuite) TestPrepareGC(c *C) { c.Assert(err, IsNil) concurrency := int(i64) c.Assert(concurrency, Equals, -1) // variable.TiKVGCConcurrency default - autoConcurrencyFinalValue, err := s.gcWorker.getGCConcurrency(ctx) + autoConcurrencyFinalValue, err1 := s.gcWorker.getGCConcurrency(ctx) + c.Assert(err1, IsNil) c.Assert(autoConcurrencyFinalValue > 0, Equals, true) err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCConcurrency, "1") // set to the min non auto value. @@ -424,7 +425,8 @@ func (s *testGCWorkerSuite) TestGetGCConcurrency(c *C) { c.Assert(err, IsNil) ctx := context.Background() - concurrency, err := s.gcWorker.getGCConcurrency(ctx) + concurrency, err1 := s.gcWorker.getGCConcurrency(ctx) + c.Assert(err1, IsNil) c.Assert(concurrency, Equals, len(s.cluster.GetAllStores())) } @@ -478,6 +480,7 @@ func (s *testGCWorkerSuite) TestCheckGCMode(c *C) { err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCMode, "invalid_mode") c.Assert(err, NotNil) // won't change the value str, err = se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCMode) + c.Assert(err, IsNil) c.Assert(str, Equals, "DISTRIBUTED") // keeps previous value. } From 55dc0c69286e63fee14d021e6cbce98839062202 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 23 Dec 2020 21:30:24 -0700 Subject: [PATCH 03/15] fix remaining tests --- ddl/serial_test.go | 7 +++---- executor/executor_test.go | 11 +++++------ server/http_handler_test.go | 2 +- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/ddl/serial_test.go b/ddl/serial_test.go index a106876521a10..b41ef3628ab29 100644 --- a/ddl/serial_test.go +++ b/ddl/serial_test.go @@ -622,7 +622,7 @@ func (s *testSerialSuite) TestRecoverTableByJobID(c *C) { ON DUPLICATE KEY UPDATE variable_value = '%[1]s'` // clear GC variables first. - tk.MustExec("delete from mysql.tidb where variable_name in ( 'tikv_gc_safe_point','tikv_gc_enable' )") + tk.MustExec("delete from mysql.tidb where variable_name in ( 'tikv_gc_safe_point' )") tk.MustExec("insert into t_recover values (1),(2),(3)") tk.MustExec("drop table t_recover") @@ -650,9 +650,8 @@ func (s *testSerialSuite) TestRecoverTableByJobID(c *C) { tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) // if GC enable is not exists in mysql.tidb - _, err = tk.Exec(fmt.Sprintf("recover table by job %d", jobID)) - c.Assert(err, NotNil) - c.Assert(err.Error(), Equals, "[ddl:-1]can not get 'tikv_gc_enable'") + tk.MustExec(fmt.Sprintf("recover table by job %d", jobID)) + tk.MustExec("DROP TABLE t_recover") err = gcutil.EnableGC(tk.Se) c.Assert(err, IsNil) diff --git a/executor/executor_test.go b/executor/executor_test.go index d3276c151bbcc..afe134b9b6afc 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -5342,7 +5342,7 @@ func (s *testRecoverTable) TestRecoverTable(c *C) { ON DUPLICATE KEY UPDATE variable_value = '%[1]s'` // clear GC variables first. - tk.MustExec("delete from mysql.tidb where variable_name in ( 'tikv_gc_safe_point','tikv_gc_enable' )") + tk.MustExec("delete from mysql.tidb where variable_name in ( 'tikv_gc_safe_point' )") tk.MustExec("insert into t_recover values (1),(2),(3)") tk.MustExec("drop table t_recover") @@ -5354,10 +5354,9 @@ func (s *testRecoverTable) TestRecoverTable(c *C) { // set GC safe point tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) - // if GC enable is not exists in mysql.tidb - _, err = tk.Exec("recover table t_recover") - c.Assert(err, NotNil) - c.Assert(err.Error(), Equals, "[ddl:-1]can not get 'tikv_gc_enable'") + // Should recover, and we can drop it straight away. + tk.MustExec("recover table t_recover") + tk.MustExec("drop table t_recover") err = gcutil.EnableGC(tk.Se) c.Assert(err, IsNil) @@ -5453,7 +5452,7 @@ func (s *testRecoverTable) TestFlashbackTable(c *C) { ON DUPLICATE KEY UPDATE variable_value = '%[1]s'` // Clear GC variables first. - tk.MustExec("delete from mysql.tidb where variable_name in ( 'tikv_gc_safe_point','tikv_gc_enable' )") + tk.MustExec("delete from mysql.tidb where variable_name in ( 'tikv_gc_safe_point' )") // Set GC safe point tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) // Set GC enable. diff --git a/server/http_handler_test.go b/server/http_handler_test.go index fd1c3a95a7fac..c967380bff268 100644 --- a/server/http_handler_test.go +++ b/server/http_handler_test.go @@ -673,7 +673,7 @@ func (ts *HTTPHandlerTestSuite) TestTiFlashReplica(c *C) { ddl.EmulatorGCDisable() gcTimeFormat := "20060102-15:04:05 -0700 MST" timeBeforeDrop := time.Now().Add(0 - 48*60*60*time.Second).Format(gcTimeFormat) - safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', ''),('tikv_gc_enable','true','') + safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '') ON DUPLICATE KEY UPDATE variable_value = '%[1]s'` // Set GC safe point and enable GC. From 3944be087e36b965316bceb938d30e5698d8a761 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Fri, 25 Dec 2020 12:48:09 -0700 Subject: [PATCH 04/15] read/write from mysql.tidb values if they exist. --- session/session.go | 107 ++++++++++++++++++++++++++++++- store/tikv/gcworker/gc_worker.go | 27 ++++++++ 2 files changed, 132 insertions(+), 2 deletions(-) diff --git a/session/session.go b/session/session.go index e483605ca1809..af3351961e706 100644 --- a/session/session.go +++ b/session/session.go @@ -95,6 +95,16 @@ var ( sessionExecuteParseDurationGeneral = metrics.SessionExecuteParseDuration.WithLabelValues(metrics.LblGeneral) ) +var gcVariableComments = map[string]string{ + variable.TiKVGCRunInterval: "GC run interval, at least 10m, in Go format.", + variable.TiKVGCLifetime: "All versions within life time will not be collected by GC, at least 10m, in Go format.", + variable.TiKVGCConcurrency: "How many goroutines used to do GC parallel, [1, 128], default 2", + variable.TiKVGCEnable: "Current GC enable status", + variable.TiKVGCMode: "Mode of GC, \"central\" or \"distributed\"", + "tikv_gc_auto_concurrency": "Let TiDB pick the concurrency automatically. If set false, tikv_gc_concurrency will be used", + variable.TiKVGCScanLockMode: "Mode of scanning locks, \"physical\" or \"legacy\"", +} + // Session context, it is consistent with the lifecycle of a client connection. type Session interface { sessionctx.Context @@ -998,7 +1008,9 @@ func (s *session) GetAllSysVars() (map[string]string, error) { ret := make(map[string]string, len(rows)) for _, r := range rows { k, v := r.GetString(0), r.GetString(1) - ret[k] = v + if v, err = s.GetTiKVGlobalSysVar(k, v); err != nil { + ret[k] = v + } } return ret, nil } @@ -1025,7 +1037,8 @@ func (s *session) GetGlobalSysVar(name string) (string, error) { } return "", err } - return sysVar, nil + // Update mysql.tidb values if required + return s.GetTiKVGlobalSysVar(name, sysVar) } // SetGlobalSysVar implements GlobalVarAccessor.SetGlobalSysVar interface. @@ -1052,6 +1065,10 @@ func (s *session) SetGlobalSysVar(name, value string) error { return err } name = strings.ToLower(name) + // update mysql.tidb if required. + if err = s.SetTiKVGlobalSysVar(name, sVal); err != nil { + return err + } variable.CheckDeprecationSetSystemVar(s.sessionVars, name) sql := fmt.Sprintf(`REPLACE %s.%s VALUES ('%s', '%s');`, mysql.SystemDB, mysql.GlobalVariablesTable, name, sVal) @@ -1059,6 +1076,92 @@ func (s *session) SetGlobalSysVar(name, value string) error { return err } +// SetTiKVGlobalSysVar handles tikv_* sysvars which need to update mysql.tidb +// for backwards compatibility. Validation has already been performed. +func (s *session) SetTiKVGlobalSysVar(name, val string) error { + switch name { + case variable.TiKVGCConcurrency: + autoConcurrency := "false" + if val == "-1" { + autoConcurrency = "true" + } + sql := fmt.Sprintf(`INSERT INTO mysql.tidb (variable_name, variable_value, comment) VALUES ('%s', '%s', '%s') + ON DUPLICATE KEY UPDATE variable_value = '%s'`, "tikv_gc_auto_concurrency", autoConcurrency, gcVariableComments[name], autoConcurrency) + _, _, err := s.ExecRestrictedSQL(sql) + if err != nil { + return err + } + fallthrough + case variable.TiKVGCEnable, variable.TiKVGCRunInterval, variable.TiKVGCLifetime, variable.TiKVGCMode, variable.TiKVGCScanLockMode: + val = onOffToTrueFalse(val) + sql := fmt.Sprintf(`INSERT INTO mysql.tidb (variable_name, variable_value, comment) VALUES ('%s', '%s', '%s') + ON DUPLICATE KEY UPDATE variable_value = '%s'`, name, val, gcVariableComments[name], val) + _, _, err := s.ExecRestrictedSQL(sql) + return err + } + return nil // not a TiKV sysVar +} + +// In mysql.tidb the convention has been to store the string value "true"/"false", +// but sysvars use the convention ON/OFF. +func trueFalseToOnOff(str string) string { + if strings.EqualFold("true", str) { + return variable.BoolOn + } else if strings.EqualFold("false", str) { + return variable.BoolOff + } + return str +} + +// In mysql.tidb the convention has been to store the string value "true"/"false", +// but sysvars use the convention ON/OFF. +func onOffToTrueFalse(str string) string { + if strings.EqualFold("ON", str) { + return "true" + } else if strings.EqualFold("OFF", str) { + return "false" + } + return str +} + +// GetTiKVGlobalSysVar handles tikv_* sysvars which need +// to read from mysql.tidb for backwards compatibility. +func (s *session) GetTiKVGlobalSysVar(name, val string) (string, error) { + switch name { + case variable.TiKVGCEnable, variable.TiKVGCRunInterval, variable.TiKVGCLifetime, + variable.TiKVGCConcurrency, variable.TiKVGCMode, variable.TiKVGCScanLockMode: + sql := fmt.Sprintf(`SELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME='%s';`, name) + tblValue, err := s.getExecRet(s, sql) + if err != nil { + return val, nil // mysql.tidb value does not exist. + } + // Run validation on the tblValue. This will return an error if it can't be validated, + // but will also make it more consistent: disTribuTeD -> DISTRIBUTED etc + tblValue = trueFalseToOnOff(tblValue) + validatedVal, err := variable.ValidateSetSystemVar(s.sessionVars, name, tblValue, variable.ScopeGlobal) + if err != nil { + logutil.Logger(context.Background()).Warn("restoring sysvar value since validating mysql.tidb value failed", + zap.Error(err), + zap.String("name", name), + zap.String("tblValue", tblValue), + zap.String("restoredValue", val)) + sql := fmt.Sprintf(`REPLACE INTO mysql.tidb (variable_name, variable_value, comment) + VALUES ('%s', '%s', '%s')`, name, val, gcVariableComments[name]) + _, _, err = s.ExecRestrictedSQL(sql) + return val, err + } + if validatedVal != val { + // The sysvar value is out of sync. + sql := fmt.Sprintf(`REPLACE %s.%s VALUES ('%s', '%s');`, + mysql.SystemDB, mysql.GlobalVariablesTable, name, validatedVal) + _, _, err = s.ExecRestrictedSQL(sql) + return validatedVal, err + } + return validatedVal, nil + } + return val, nil // not a TiKV sysVar +} + func (s *session) ensureFullGlobalStats() error { rows, _, err := s.ExecRestrictedSQL(`select count(1) from information_schema.tables t where t.create_options = 'partitioned' and not exists (select 1 from mysql.stats_meta m where m.table_id = t.tidb_table_id)`) diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index 9c85907ac8d29..bff9425c28c94 100755 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -270,6 +270,27 @@ func (w *GCWorker) prepare() (bool, uint64, error) { return doGC, safePoint, errors.Trace(err) } +func (w *GCWorker) initTableValues() error { + + // This is a static set of data observed in TiDB on a default install *prior* to moving tikv values to sysVars. + // It is no longer needed, but some examples in the wild and in manual pages use "UPDATE". + // This requires a row to exist for the operation to succeed. Because the ON DUPLICATE KEY UPDATE + // action is UPDATE variable_name = variable_name, it is a noop if the row already exists. + + stmt := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES + ('tikv_gc_enable', 'true', 'Current GC enable status'), + ('tikv_gc_run_interval', '10m0s', 'GC run interval, at least 10m, in Go format.'), + ('tikv_gc_life_time', '10m0s', 'All versions within life time will not be collected by GC, at least 10m, in Go format.') + ON DUPLICATE KEY + UPDATE variable_name = variable_name` + + se := createSession(w.store) + defer se.Close() + _, err := se.Execute(context.Background(), stmt) + return err + +} + func (w *GCWorker) checkPrepare(ctx context.Context) (bool, uint64, error) { se := createSession(w.store) defer se.Close() @@ -277,6 +298,12 @@ func (w *GCWorker) checkPrepare(ctx context.Context) (bool, uint64, error) { if err != nil { return false, 0, errors.Trace(err) } + + if err := w.initTableValues(); err != nil { // preserve consistency + logutil.Logger(ctx).Warn("[gc worker] could not initialize mysql.tidb values.") + return false, 0, err + } + if !variable.TiDBOptOn(enable) { logutil.Logger(ctx).Warn("[gc worker] gc status is disabled.") return false, 0, nil From 1d931bc13d5d4d7eb5edab7e6892cdb20cd2651b Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Fri, 25 Dec 2020 13:06:43 -0700 Subject: [PATCH 05/15] Fix handling for auto value --- session/session.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/session/session.go b/session/session.go index af3351961e706..a779bad1fb9be 100644 --- a/session/session.go +++ b/session/session.go @@ -1128,8 +1128,16 @@ func onOffToTrueFalse(str string) string { // to read from mysql.tidb for backwards compatibility. func (s *session) GetTiKVGlobalSysVar(name, val string) (string, error) { switch name { + case variable.TiKVGCConcurrency: + // Check if autoconcurrency is set + sql := fmt.Sprintf(`SELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME='%s';`, "tikv_gc_auto_concurrency") + autoConcurrencyVal, err := s.getExecRet(s, sql) + if err == nil && strings.EqualFold(autoConcurrencyVal, "true") { + return "-1", nil // convention for "AUTO" + } + fallthrough case variable.TiKVGCEnable, variable.TiKVGCRunInterval, variable.TiKVGCLifetime, - variable.TiKVGCConcurrency, variable.TiKVGCMode, variable.TiKVGCScanLockMode: + variable.TiKVGCMode, variable.TiKVGCScanLockMode: sql := fmt.Sprintf(`SELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME='%s';`, name) tblValue, err := s.getExecRet(s, sql) if err != nil { From a454ad157627e266b0a7ba37b9ee05669b612c35 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Fri, 25 Dec 2020 16:22:39 -0700 Subject: [PATCH 06/15] Add tests for new behavior --- session/session_test.go | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/session/session_test.go b/session/session_test.go index 4fcf91414c187..e830a53279580 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -3819,3 +3819,38 @@ func (s *testSessionSerialSuite) TestIssue21943(c *C) { _, err = tk.Exec("set @@last_plan_from_cache='123';") c.Assert(err.Error(), Equals, "[variable:1238]Variable 'last_plan_from_cache' is a read only variable") } + +func (s *testSessionSerialSuite) TestTiKVSystemVars(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + + result := tk.MustQuery("SHOW GLOBAL VARIABLES LIKE 'tikv_gc_enable'") // default is on from the sysvar + result.Check(testkit.Rows("tikv_gc_enable ON")) + result = tk.MustQuery("SELECT variable_value FROM mysql.tidb WHERE variable_name = 'tikv_gc_enable'") + result.Check(testkit.Rows()) // but no value in the table (yet) because the value has not been set and the GC has never been run + + // update will set a value in the table + tk.MustExec("SET GLOBAL tikv_gc_enable = 1") + result = tk.MustQuery("SELECT variable_value FROM mysql.tidb WHERE variable_name = 'tikv_gc_enable'") + result.Check(testkit.Rows("true")) + + tk.MustExec("UPDATE mysql.tidb SET variable_value = 'false' WHERE variable_name='tikv_gc_enable'") + result = tk.MustQuery("SELECT @@tikv_gc_enable;") + result.Check(testkit.Rows("0")) // reads from mysql.tidb value and changes to false + + tk.MustExec("SET GLOBAL tikv_gc_concurrency = -1") // sets auto concurrency and concurrency + result = tk.MustQuery("SELECT variable_value FROM mysql.tidb WHERE variable_name = 'tikv_gc_auto_concurrency'") + result.Check(testkit.Rows("true")) + result = tk.MustQuery("SELECT variable_value FROM mysql.tidb WHERE variable_name = 'tikv_gc_concurrency'") + result.Check(testkit.Rows("-1")) + + tk.MustExec("SET GLOBAL tikv_gc_concurrency = 5") // sets auto concurrency and concurrency + result = tk.MustQuery("SELECT variable_value FROM mysql.tidb WHERE variable_name = 'tikv_gc_auto_concurrency'") + result.Check(testkit.Rows("false")) + result = tk.MustQuery("SELECT variable_value FROM mysql.tidb WHERE variable_name = 'tikv_gc_concurrency'") + result.Check(testkit.Rows("5")) + + tk.MustExec("UPDATE mysql.tidb SET variable_value = 'true' WHERE variable_name='tikv_gc_auto_concurrency'") + result = tk.MustQuery("SELECT @@tikv_gc_concurrency;") + result.Check(testkit.Rows("-1")) // because auto_concurrency is turned on it takes precedence + +} From 6e3b46176d0a4f8b1c50fec22d7a739d6feb8a17 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Sun, 3 Jan 2021 21:36:25 -0700 Subject: [PATCH 07/15] Address reviewer feedback --- session/session.go | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/session/session.go b/session/session.go index a779bad1fb9be..81d2125c39d16 100644 --- a/session/session.go +++ b/session/session.go @@ -74,6 +74,7 @@ import ( "github.com/pingcap/tidb/util/kvcache" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" + "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tidb/util/timeutil" "github.com/pingcap/tipb/go-binlog" "go.uber.org/zap" @@ -93,6 +94,8 @@ var ( sessionExecuteCompileDurationGeneral = metrics.SessionExecuteCompileDuration.WithLabelValues(metrics.LblGeneral) sessionExecuteParseDurationInternal = metrics.SessionExecuteParseDuration.WithLabelValues(metrics.LblInternal) sessionExecuteParseDurationGeneral = metrics.SessionExecuteParseDuration.WithLabelValues(metrics.LblGeneral) + + tiKVGCAutoConcurrency = "tikv_gc_auto_concurrency" ) var gcVariableComments = map[string]string{ @@ -101,7 +104,7 @@ var gcVariableComments = map[string]string{ variable.TiKVGCConcurrency: "How many goroutines used to do GC parallel, [1, 128], default 2", variable.TiKVGCEnable: "Current GC enable status", variable.TiKVGCMode: "Mode of GC, \"central\" or \"distributed\"", - "tikv_gc_auto_concurrency": "Let TiDB pick the concurrency automatically. If set false, tikv_gc_concurrency will be used", + tiKVGCAutoConcurrency: "Let TiDB pick the concurrency automatically. If set false, tikv_gc_concurrency will be used", variable.TiKVGCScanLockMode: "Mode of scanning locks, \"physical\" or \"legacy\"", } @@ -1071,7 +1074,9 @@ func (s *session) SetGlobalSysVar(name, value string) error { } variable.CheckDeprecationSetSystemVar(s.sessionVars, name) sql := fmt.Sprintf(`REPLACE %s.%s VALUES ('%s', '%s');`, - mysql.SystemDB, mysql.GlobalVariablesTable, name, sVal) + mysql.SystemDB, mysql.GlobalVariablesTable, name, + stringutil.Escape(sVal, s.sessionVars.SQLMode), + ) _, _, err = s.ExecRestrictedSQL(sql) return err } @@ -1085,8 +1090,8 @@ func (s *session) SetTiKVGlobalSysVar(name, val string) error { if val == "-1" { autoConcurrency = "true" } - sql := fmt.Sprintf(`INSERT INTO mysql.tidb (variable_name, variable_value, comment) VALUES ('%s', '%s', '%s') - ON DUPLICATE KEY UPDATE variable_value = '%s'`, "tikv_gc_auto_concurrency", autoConcurrency, gcVariableComments[name], autoConcurrency) + sql := fmt.Sprintf(`INSERT INTO mysql.tidb (variable_name, variable_value, comment) VALUES ('%[1]s', '%[2]s', '%[3]s') + ON DUPLICATE KEY UPDATE variable_value = '%[2]s'`, tiKVGCAutoConcurrency, autoConcurrency, gcVariableComments[name]) _, _, err := s.ExecRestrictedSQL(sql) if err != nil { return err @@ -1094,8 +1099,8 @@ func (s *session) SetTiKVGlobalSysVar(name, val string) error { fallthrough case variable.TiKVGCEnable, variable.TiKVGCRunInterval, variable.TiKVGCLifetime, variable.TiKVGCMode, variable.TiKVGCScanLockMode: val = onOffToTrueFalse(val) - sql := fmt.Sprintf(`INSERT INTO mysql.tidb (variable_name, variable_value, comment) VALUES ('%s', '%s', '%s') - ON DUPLICATE KEY UPDATE variable_value = '%s'`, name, val, gcVariableComments[name], val) + sql := fmt.Sprintf(`INSERT INTO mysql.tidb (variable_name, variable_value, comment) VALUES ('%[1]s', '%[2]s', '%[3]s') + ON DUPLICATE KEY UPDATE variable_value = '%[2]s'`, name, stringutil.Escape(val, s.sessionVars.SQLMode), gcVariableComments[name]) _, _, err := s.ExecRestrictedSQL(sql) return err } @@ -1130,7 +1135,7 @@ func (s *session) GetTiKVGlobalSysVar(name, val string) (string, error) { switch name { case variable.TiKVGCConcurrency: // Check if autoconcurrency is set - sql := fmt.Sprintf(`SELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME='%s';`, "tikv_gc_auto_concurrency") + sql := fmt.Sprintf(`SELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME='%s';`, tiKVGCAutoConcurrency) autoConcurrencyVal, err := s.getExecRet(s, sql) if err == nil && strings.EqualFold(autoConcurrencyVal, "true") { return "-1", nil // convention for "AUTO" @@ -1154,14 +1159,14 @@ func (s *session) GetTiKVGlobalSysVar(name, val string) (string, error) { zap.String("tblValue", tblValue), zap.String("restoredValue", val)) sql := fmt.Sprintf(`REPLACE INTO mysql.tidb (variable_name, variable_value, comment) - VALUES ('%s', '%s', '%s')`, name, val, gcVariableComments[name]) + VALUES ('%s', '%s', '%s')`, name, stringutil.Escape(val, s.sessionVars.SQLMode), gcVariableComments[name]) _, _, err = s.ExecRestrictedSQL(sql) return val, err } if validatedVal != val { // The sysvar value is out of sync. sql := fmt.Sprintf(`REPLACE %s.%s VALUES ('%s', '%s');`, - mysql.SystemDB, mysql.GlobalVariablesTable, name, validatedVal) + mysql.SystemDB, mysql.GlobalVariablesTable, name, stringutil.Escape(validatedVal, s.sessionVars.SQLMode)) _, _, err = s.ExecRestrictedSQL(sql) return validatedVal, err } From 90993cdef5a3ccc701d4ce06f77a74d047af85fb Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Sun, 3 Jan 2021 22:12:53 -0700 Subject: [PATCH 08/15] Address reviewer feedback --- executor/set_test.go | 4 ++++ session/session.go | 17 ++++++++++------- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/executor/set_test.go b/executor/set_test.go index aa7b312529b6d..d94014defa7cd 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -510,6 +510,10 @@ func (s *testSerialSuite1) TestSetVar(c *C) { tk.MustQuery("select @@global.tidb_enable_extended_stats").Check(testkit.Rows("1")) tk.MustExec("SET GLOBAL tidb_enable_extended_stats = off") tk.MustQuery("select @@global.tidb_enable_extended_stats").Check(testkit.Rows("0")) + + // Test issue #22145 + tk.MustExec(`set global sync_relay_log = "'"`) + } func (s *testSuite5) TestTruncateIncorrectIntSessionVar(c *C) { diff --git a/session/session.go b/session/session.go index 0a950d89599f6..cc45f45e07420 100644 --- a/session/session.go +++ b/session/session.go @@ -74,7 +74,6 @@ import ( "github.com/pingcap/tidb/util/kvcache" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" - "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tidb/util/timeutil" "github.com/pingcap/tipb/go-binlog" "go.uber.org/zap" @@ -1074,13 +1073,17 @@ func (s *session) SetGlobalSysVar(name, value string) error { } variable.CheckDeprecationSetSystemVar(s.sessionVars, name) sql := fmt.Sprintf(`REPLACE %s.%s VALUES ('%s', '%s');`, - mysql.SystemDB, mysql.GlobalVariablesTable, name, - stringutil.Escape(sVal, s.sessionVars.SQLMode), - ) + mysql.SystemDB, mysql.GlobalVariablesTable, name, escapeUserString(sVal)) _, _, err = s.ExecRestrictedSQL(sql) return err } +// escape user supplied string for internal SQL. Not safe for all cases, since it doesn't +// handle quote-type, sql-mode, character set breakout. +func escapeUserString(str string) string { + return strings.ReplaceAll(str, `'`, `\'`) +} + // SetTiKVGlobalSysVar handles tikv_* sysvars which need to update mysql.tidb // for backwards compatibility. Validation has already been performed. func (s *session) SetTiKVGlobalSysVar(name, val string) error { @@ -1100,7 +1103,7 @@ func (s *session) SetTiKVGlobalSysVar(name, val string) error { case variable.TiKVGCEnable, variable.TiKVGCRunInterval, variable.TiKVGCLifetime, variable.TiKVGCMode, variable.TiKVGCScanLockMode: val = onOffToTrueFalse(val) sql := fmt.Sprintf(`INSERT INTO mysql.tidb (variable_name, variable_value, comment) VALUES ('%[1]s', '%[2]s', '%[3]s') - ON DUPLICATE KEY UPDATE variable_value = '%[2]s'`, name, stringutil.Escape(val, s.sessionVars.SQLMode), gcVariableComments[name]) + ON DUPLICATE KEY UPDATE variable_value = '%[2]s'`, name, escapeUserString(val), gcVariableComments[name]) _, _, err := s.ExecRestrictedSQL(sql) return err } @@ -1159,14 +1162,14 @@ func (s *session) GetTiKVGlobalSysVar(name, val string) (string, error) { zap.String("tblValue", tblValue), zap.String("restoredValue", val)) sql := fmt.Sprintf(`REPLACE INTO mysql.tidb (variable_name, variable_value, comment) - VALUES ('%s', '%s', '%s')`, name, stringutil.Escape(val, s.sessionVars.SQLMode), gcVariableComments[name]) + VALUES ('%s', '%s', '%s')`, name, escapeUserString(val), gcVariableComments[name]) _, _, err = s.ExecRestrictedSQL(sql) return val, err } if validatedVal != val { // The sysvar value is out of sync. sql := fmt.Sprintf(`REPLACE %s.%s VALUES ('%s', '%s');`, - mysql.SystemDB, mysql.GlobalVariablesTable, name, stringutil.Escape(validatedVal, s.sessionVars.SQLMode)) + mysql.SystemDB, mysql.GlobalVariablesTable, name, escapeUserString(validatedVal)) _, _, err = s.ExecRestrictedSQL(sql) return validatedVal, err } From 8a731984e2e48d15badd9d9e516f5876b860e355 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Sun, 3 Jan 2021 22:51:14 -0700 Subject: [PATCH 09/15] restore original store/tikv/gcworker 1 --- ddl/serial_test.go | 2 +- executor/executor_test.go | 4 +- server/http_handler_test.go | 2 +- store/tikv/gcworker/gc_worker.go | 310 ++++++++++++++++++-------- store/tikv/gcworker/gc_worker_test.go | 176 +++++++-------- util/gcutil/gcutil.go | 1 - 6 files changed, 299 insertions(+), 196 deletions(-) mode change 100755 => 100644 store/tikv/gcworker/gc_worker.go diff --git a/ddl/serial_test.go b/ddl/serial_test.go index b41ef3628ab29..4abfb57f70c3d 100644 --- a/ddl/serial_test.go +++ b/ddl/serial_test.go @@ -622,7 +622,7 @@ func (s *testSerialSuite) TestRecoverTableByJobID(c *C) { ON DUPLICATE KEY UPDATE variable_value = '%[1]s'` // clear GC variables first. - tk.MustExec("delete from mysql.tidb where variable_name in ( 'tikv_gc_safe_point' )") + tk.MustExec("delete from mysql.tidb where variable_name in ( 'tikv_gc_safe_point','tikv_gc_enable' )") tk.MustExec("insert into t_recover values (1),(2),(3)") tk.MustExec("drop table t_recover") diff --git a/executor/executor_test.go b/executor/executor_test.go index 35d7ebbfafd2f..4d2ab4d13f06b 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -5363,7 +5363,7 @@ func (s *testRecoverTable) TestRecoverTable(c *C) { ON DUPLICATE KEY UPDATE variable_value = '%[1]s'` // clear GC variables first. - tk.MustExec("delete from mysql.tidb where variable_name in ( 'tikv_gc_safe_point' )") + tk.MustExec("delete from mysql.tidb where variable_name in ( 'tikv_gc_safe_point','tikv_gc_enable' )") tk.MustExec("insert into t_recover values (1),(2),(3)") tk.MustExec("drop table t_recover") @@ -5473,7 +5473,7 @@ func (s *testRecoverTable) TestFlashbackTable(c *C) { ON DUPLICATE KEY UPDATE variable_value = '%[1]s'` // Clear GC variables first. - tk.MustExec("delete from mysql.tidb where variable_name in ( 'tikv_gc_safe_point' )") + tk.MustExec("delete from mysql.tidb where variable_name in ( 'tikv_gc_safe_point','tikv_gc_enable' )") // Set GC safe point tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) // Set GC enable. diff --git a/server/http_handler_test.go b/server/http_handler_test.go index c967380bff268..fd1c3a95a7fac 100644 --- a/server/http_handler_test.go +++ b/server/http_handler_test.go @@ -673,7 +673,7 @@ func (ts *HTTPHandlerTestSuite) TestTiFlashReplica(c *C) { ddl.EmulatorGCDisable() gcTimeFormat := "20060102-15:04:05 -0700 MST" timeBeforeDrop := time.Now().Add(0 - 48*60*60*time.Second).Format(gcTimeFormat) - safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '') + safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', ''),('tikv_gc_enable','true','') ON DUPLICATE KEY UPDATE variable_value = '%[1]s'` // Set GC safe point and enable GC. diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go old mode 100755 new mode 100644 index bff9425c28c94..6a030cda6832e --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -23,6 +23,7 @@ import ( "os" "sort" "strconv" + "strings" "sync" "time" @@ -109,27 +110,63 @@ const ( booleanTrue = "true" booleanFalse = "false" - gcWorkerTickInterval = time.Minute - gcWorkerLease = time.Minute * 2 - gcLeaderUUIDKey = "tikv_gc_leader_uuid" - gcLeaderDescKey = "tikv_gc_leader_desc" - gcLeaderLeaseKey = "tikv_gc_leader_lease" - gcLastRunTimeKey = "tikv_gc_last_run_time" - gcWaitTime = time.Minute * 1 - gcRedoDeleteRangeDelay = 24 * time.Hour - gcSafePointKey = "tikv_gc_safe_point" + gcWorkerTickInterval = time.Minute + gcWorkerLease = time.Minute * 2 + gcLeaderUUIDKey = "tikv_gc_leader_uuid" + gcLeaderDescKey = "tikv_gc_leader_desc" + gcLeaderLeaseKey = "tikv_gc_leader_lease" + + gcLastRunTimeKey = "tikv_gc_last_run_time" + gcRunIntervalKey = "tikv_gc_run_interval" + gcDefaultRunInterval = time.Minute * 10 + gcWaitTime = time.Minute * 1 + gcRedoDeleteRangeDelay = 24 * time.Hour + + gcLifeTimeKey = "tikv_gc_life_time" + gcDefaultLifeTime = time.Minute * 10 + gcMinLifeTime = time.Minute * 10 + gcSafePointKey = "tikv_gc_safe_point" + gcConcurrencyKey = "tikv_gc_concurrency" + gcDefaultConcurrency = 2 + gcMinConcurrency = 1 + gcMaxConcurrency = 128 + // We don't want gc to sweep out the cached info belong to other processes, like coprocessor. + gcScanLockLimit = tikv.ResolvedCacheSize / 2 + + gcEnableKey = "tikv_gc_enable" + gcDefaultEnableValue = true + + gcModeKey = "tikv_gc_mode" + gcModeCentral = "central" + gcModeDistributed = "distributed" + gcModeDefault = gcModeDistributed + + gcScanLockModeKey = "tikv_gc_scan_lock_mode" + gcScanLockModeLegacy = "legacy" + gcScanLockModePhysical = "physical" + gcScanLockModeDefault = gcScanLockModePhysical + + gcAutoConcurrencyKey = "tikv_gc_auto_concurrency" + gcDefaultAutoConcurrency = true + gcWorkerServiceSafePointID = "gc_worker" - gcScanLockLimit = tikv.ResolvedCacheSize / 2 ) var gcSafePointCacheInterval = tikv.GcSafePointCacheInterval var gcVariableComments = map[string]string{ - gcLeaderUUIDKey: "Current GC worker leader UUID. (DO NOT EDIT)", - gcLeaderDescKey: "Host name and pid of current GC leader. (DO NOT EDIT)", - gcLeaderLeaseKey: "Current GC worker leader lease. (DO NOT EDIT)", - gcLastRunTimeKey: "The time when last GC starts. (DO NOT EDIT)", - gcSafePointKey: "All versions after safe point can be accessed. (DO NOT EDIT)", + gcLeaderUUIDKey: "Current GC worker leader UUID. (DO NOT EDIT)", + gcLeaderDescKey: "Host name and pid of current GC leader. (DO NOT EDIT)", + gcLeaderLeaseKey: "Current GC worker leader lease. (DO NOT EDIT)", + gcLastRunTimeKey: "The time when last GC starts. (DO NOT EDIT)", + gcRunIntervalKey: "GC run interval, at least 10m, in Go format.", + gcLifeTimeKey: "All versions within life time will not be collected by GC, at least 10m, in Go format.", + gcSafePointKey: "All versions after safe point can be accessed. (DO NOT EDIT)", + gcConcurrencyKey: "How many goroutines used to do GC parallel, [1, 128], default 2", + gcEnableKey: "Current GC enable status", + gcModeKey: "Mode of GC, \"central\" or \"distributed\"", + gcAutoConcurrencyKey: "Let TiDB pick the concurrency automatically. If set false, tikv_gc_concurrency will be used", + gcScanLockModeKey: "Mode of scanning locks, \"physical\" or \"legacy\"", } func (w *GCWorker) start(ctx context.Context, wg *sync.WaitGroup) { @@ -195,8 +232,8 @@ func (w *GCWorker) tick(ctx context.Context) { } } else { // Config metrics should always be updated by leader, set them to 0 when current instance is not leader. - metrics.GCConfigGauge.WithLabelValues(variable.TiKVGCRunInterval).Set(0) - metrics.GCConfigGauge.WithLabelValues(variable.TiKVGCLifetime).Set(0) + metrics.GCConfigGauge.WithLabelValues(gcRunIntervalKey).Set(0) + metrics.GCConfigGauge.WithLabelValues(gcLifeTimeKey).Set(0) } } @@ -270,41 +307,12 @@ func (w *GCWorker) prepare() (bool, uint64, error) { return doGC, safePoint, errors.Trace(err) } -func (w *GCWorker) initTableValues() error { - - // This is a static set of data observed in TiDB on a default install *prior* to moving tikv values to sysVars. - // It is no longer needed, but some examples in the wild and in manual pages use "UPDATE". - // This requires a row to exist for the operation to succeed. Because the ON DUPLICATE KEY UPDATE - // action is UPDATE variable_name = variable_name, it is a noop if the row already exists. - - stmt := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES - ('tikv_gc_enable', 'true', 'Current GC enable status'), - ('tikv_gc_run_interval', '10m0s', 'GC run interval, at least 10m, in Go format.'), - ('tikv_gc_life_time', '10m0s', 'All versions within life time will not be collected by GC, at least 10m, in Go format.') - ON DUPLICATE KEY - UPDATE variable_name = variable_name` - - se := createSession(w.store) - defer se.Close() - _, err := se.Execute(context.Background(), stmt) - return err - -} - func (w *GCWorker) checkPrepare(ctx context.Context) (bool, uint64, error) { - se := createSession(w.store) - defer se.Close() - enable, err := se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCEnable) + enable, err := w.checkGCEnable() if err != nil { return false, 0, errors.Trace(err) } - - if err := w.initTableValues(); err != nil { // preserve consistency - logutil.Logger(ctx).Warn("[gc worker] could not initialize mysql.tidb values.") - return false, 0, err - } - - if !variable.TiDBOptOn(enable) { + if !enable { logutil.Logger(ctx).Warn("[gc worker] gc status is disabled.") return false, 0, nil } @@ -372,46 +380,77 @@ func (w *GCWorker) getOracleTime() (time.Time, error) { return time.Unix(sec, nsec), nil } -func (w *GCWorker) getGCConcurrency(ctx context.Context) (int, error) { - se := createSession(w.store) - defer se.Close() - concurrencyStr, err := se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCConcurrency) +func (w *GCWorker) checkGCEnable() (bool, error) { + return w.loadBooleanWithDefault(gcEnableKey, gcDefaultEnableValue) +} + +func (w *GCWorker) checkUseAutoConcurrency() (bool, error) { + return w.loadBooleanWithDefault(gcAutoConcurrencyKey, gcDefaultAutoConcurrency) +} + +func (w *GCWorker) loadBooleanWithDefault(key string, defaultValue bool) (bool, error) { + str, err := w.loadValueFromSysTable(key) if err != nil { - logutil.Logger(ctx).Error("[gc worker] could not fetch tikv_gc_concurrency") - return 2, nil + return false, errors.Trace(err) } - concurrency, err := strconv.ParseInt(concurrencyStr, 10, 32) + if str == "" { + // Save default value for gc enable key. The default value is always true. + defaultValueStr := booleanFalse + if defaultValue { + defaultValueStr = booleanTrue + } + err = w.saveValueToSysTable(key, defaultValueStr) + if err != nil { + return defaultValue, errors.Trace(err) + } + return defaultValue, nil + } + return strings.EqualFold(str, booleanTrue), nil +} + +func (w *GCWorker) getGCConcurrency(ctx context.Context) (int, error) { + useAutoConcurrency, err := w.checkUseAutoConcurrency() if err != nil { - logutil.Logger(ctx).Error("[gc worker] could not convert TiKVGCConcurrency!", - zap.String("concurrency", concurrencyStr)) - return 2, nil // using previous default + logutil.Logger(ctx).Error("[gc worker] failed to load config gc_auto_concurrency. use default value.", + zap.String("uuid", w.uuid), + zap.Error(err)) + useAutoConcurrency = gcDefaultAutoConcurrency } - if concurrency != -1 { // use an explicit value - return int(concurrency), nil + if !useAutoConcurrency { + return w.loadGCConcurrencyWithDefault() } - // Calculate concurrency from an auto value (-1) - if stores, err := w.getStoresForGC(ctx); err == nil { - autoConcurrency := len(stores) - if autoConcurrency == 0 { - logutil.Logger(ctx).Error("[gc worker] no store is up", - zap.String("uuid", w.uuid)) - return 0, errors.New("[gc worker] no store is up") + stores, err := w.getStoresForGC(ctx) + concurrency := len(stores) + if err != nil { + logutil.Logger(ctx).Error("[gc worker] failed to get up stores to calculate concurrency. use config.", + zap.String("uuid", w.uuid), + zap.Error(err)) + + concurrency, err = w.loadGCConcurrencyWithDefault() + if err != nil { + logutil.Logger(ctx).Error("[gc worker] failed to load gc concurrency from config. use default value.", + zap.String("uuid", w.uuid), + zap.Error(err)) + concurrency = gcDefaultConcurrency } - return autoConcurrency, nil } - return 0, errors.New("[gc worker] Could not getStoresForGC") + + if concurrency == 0 { + logutil.Logger(ctx).Error("[gc worker] no store is up", + zap.String("uuid", w.uuid)) + return 0, errors.New("[gc worker] no store is up") + } + + return concurrency, nil } func (w *GCWorker) checkGCInterval(now time.Time) (bool, error) { - se := createSession(w.store) - defer se.Close() - runIntervalStr, err := se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCRunInterval) - runInterval, err2 := strToDuration(runIntervalStr) - if err != nil || err2 != nil { + runInterval, err := w.loadDurationWithDefault(gcRunIntervalKey, gcDefaultRunInterval) + if err != nil { return false, errors.Trace(err) } - metrics.GCConfigGauge.WithLabelValues(variable.TiKVGCRunInterval).Set(runInterval.Seconds()) + metrics.GCConfigGauge.WithLabelValues(gcRunIntervalKey).Set(runInterval.Seconds()) lastRun, err := w.loadTime(gcLastRunTimeKey) if err != nil { return false, errors.Trace(err) @@ -428,16 +467,30 @@ func (w *GCWorker) checkGCInterval(now time.Time) (bool, error) { return true, nil } +// validateGCLifeTime checks whether life time is small than min gc life time. +func (w *GCWorker) validateGCLifeTime(lifeTime time.Duration) (time.Duration, error) { + if lifeTime >= gcMinLifeTime { + return lifeTime, nil + } + + logutil.BgLogger().Info("[gc worker] invalid gc life time", + zap.Duration("get gc life time", lifeTime), + zap.Duration("min gc life time", gcMinLifeTime)) + + err := w.saveDuration(gcLifeTimeKey, gcMinLifeTime) + return gcMinLifeTime, err +} + func (w *GCWorker) calculateNewSafePoint(ctx context.Context, now time.Time) (*time.Time, uint64, error) { - se := createSession(w.store) - defer se.Close() - lifeTimeStr, err := se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCLifetime) - lifeTime, err2 := strToDuration(lifeTimeStr) - if err != nil || err2 != nil { + lifeTime, err := w.loadDurationWithDefault(gcLifeTimeKey, gcDefaultLifeTime) + if err != nil { return nil, 0, errors.Trace(err) } - - metrics.GCConfigGauge.WithLabelValues(variable.TiKVGCLifetime).Set(lifeTime.Seconds()) + *lifeTime, err = w.validateGCLifeTime(*lifeTime) + if err != nil { + return nil, 0, err + } + metrics.GCConfigGauge.WithLabelValues(gcLifeTimeKey).Set(lifeTime.Seconds()) lastSafePoint, err := w.loadTime(gcSafePointKey) if err != nil { return nil, 0, errors.Trace(err) @@ -827,31 +880,78 @@ func (w *GCWorker) getStoresMapForGC(ctx context.Context) (map[uint64]*metapb.St return storesMap, nil } +func (w *GCWorker) loadGCConcurrencyWithDefault() (int, error) { + str, err := w.loadValueFromSysTable(gcConcurrencyKey) + if err != nil { + return gcDefaultConcurrency, errors.Trace(err) + } + if str == "" { + err = w.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(gcDefaultConcurrency)) + if err != nil { + return gcDefaultConcurrency, errors.Trace(err) + } + return gcDefaultConcurrency, nil + } + + jobConcurrency, err := strconv.Atoi(str) + if err != nil { + return gcDefaultConcurrency, err + } + + if jobConcurrency < gcMinConcurrency { + jobConcurrency = gcMinConcurrency + } + + if jobConcurrency > gcMaxConcurrency { + jobConcurrency = gcMaxConcurrency + } + + return jobConcurrency, nil +} + func (w *GCWorker) checkUseDistributedGC() (bool, error) { - se := createSession(w.store) - defer se.Close() - str, err := se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCMode) + str, err := w.loadValueFromSysTable(gcModeKey) if err != nil { return false, errors.Trace(err) } - // str is validated by the sysVar system, it can only be 'DISTRIBUTED' (default) or 'CENTRAL' - if str == "CENTRAL" { + if str == "" { + err = w.saveValueToSysTable(gcModeKey, gcModeDefault) + if err != nil { + return false, errors.Trace(err) + } + str = gcModeDefault + } + if strings.EqualFold(str, gcModeDistributed) { + return true, nil + } + if strings.EqualFold(str, gcModeCentral) { return false, nil } + logutil.BgLogger().Warn("[gc worker] distributed mode will be used", + zap.String("invalid gc mode", str)) return true, nil } func (w *GCWorker) checkUsePhysicalScanLock() (bool, error) { - se := createSession(w.store) - defer se.Close() - str, err := se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCScanLockMode) + str, err := w.loadValueFromSysTable(gcScanLockModeKey) if err != nil { return false, errors.Trace(err) } - // str is validated by sysVar system, it will be PHYSICAL or LEGACY (default) - if str == "PHYSICAL" { + if str == "" { + err = w.saveValueToSysTable(gcScanLockModeKey, gcScanLockModeDefault) + if err != nil { + return false, errors.Trace(err) + } + str = gcScanLockModeDefault + } + if strings.EqualFold(str, gcScanLockModePhysical) { return true, nil } + if strings.EqualFold(str, gcScanLockModeLegacy) { + return false, nil + } + logutil.BgLogger().Warn("[gc worker] legacy scan lock mode will be used", + zap.String("invalid scan lock mode", str)) return false, nil } @@ -1610,7 +1710,16 @@ func (w *GCWorker) loadTime(key string) (*time.Time, error) { return &t, nil } -func strToDuration(str string) (*time.Duration, error) { +func (w *GCWorker) saveDuration(key string, d time.Duration) error { + err := w.saveValueToSysTable(key, d.String()) + return errors.Trace(err) +} + +func (w *GCWorker) loadDuration(key string) (*time.Duration, error) { + str, err := w.loadValueFromSysTable(key) + if err != nil { + return nil, errors.Trace(err) + } if str == "" { return nil, nil } @@ -1621,6 +1730,21 @@ func strToDuration(str string) (*time.Duration, error) { return &d, nil } +func (w *GCWorker) loadDurationWithDefault(key string, def time.Duration) (*time.Duration, error) { + d, err := w.loadDuration(key) + if err != nil { + return nil, errors.Trace(err) + } + if d == nil { + err = w.saveDuration(key, def) + if err != nil { + return nil, errors.Trace(err) + } + return &def, nil + } + return d, nil +} + func (w *GCWorker) loadValueFromSysTable(key string) (string, error) { ctx := context.Background() se := createSession(w.store) diff --git a/store/tikv/gcworker/gc_worker_test.go b/store/tikv/gcworker/gc_worker_test.go index 865642a867408..11b981aaa1758 100644 --- a/store/tikv/gcworker/gc_worker_test.go +++ b/store/tikv/gcworker/gc_worker_test.go @@ -271,17 +271,10 @@ func (s *testGCWorkerSuite) TestPrepareGC(c *C) { c.Assert(lastRun, NotNil) safePoint, err := s.gcWorker.loadTime(gcSafePointKey) c.Assert(err, IsNil) - - se := createSession(s.gcWorker.store) - defer se.Close() - - gcDefaultLifeTime, err := strToDuration("10m0s") // variable.TiKVGCLifetime default - c.Assert(err, IsNil) - s.timeEqual(c, safePoint.Add(*gcDefaultLifeTime), now, 2*time.Second) + s.timeEqual(c, safePoint.Add(gcDefaultLifeTime), now, 2*time.Second) // Change GC run interval. - t := time.Minute * 5 - err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCRunInterval, t.String()) + err = s.gcWorker.saveDuration(gcRunIntervalKey, time.Minute*5) c.Assert(err, IsNil) s.oracle.AddOffset(time.Minute * 4) ok, _, err = s.gcWorker.prepare() @@ -293,8 +286,7 @@ func (s *testGCWorkerSuite) TestPrepareGC(c *C) { c.Assert(ok, IsTrue) // Change GC life time. - t = time.Minute * 30 - err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCLifetime, t.String()) + err = s.gcWorker.saveDuration(gcLifeTimeKey, time.Minute*30) c.Assert(err, IsNil) s.oracle.AddOffset(time.Minute * 5) ok, _, err = s.gcWorker.prepare() @@ -311,74 +303,73 @@ func (s *testGCWorkerSuite) TestPrepareGC(c *C) { s.timeEqual(c, safePoint.Add(time.Minute*30), now, 2*time.Second) // Change GC concurrency. - ctx := context.Background() - concurrencyStr, err := se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCConcurrency) - c.Assert(err, IsNil) - i64, err := strconv.ParseInt(concurrencyStr, 10, 32) + concurrency, err := s.gcWorker.loadGCConcurrencyWithDefault() c.Assert(err, IsNil) - concurrency := int(i64) - c.Assert(concurrency, Equals, -1) // variable.TiKVGCConcurrency default - autoConcurrencyFinalValue, err1 := s.gcWorker.getGCConcurrency(ctx) - c.Assert(err1, IsNil) - c.Assert(autoConcurrencyFinalValue > 0, Equals, true) + c.Assert(concurrency, Equals, gcDefaultConcurrency) - err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCConcurrency, "1") // set to the min non auto value. + err = s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(gcMinConcurrency)) c.Assert(err, IsNil) - concurrency, err = s.gcWorker.getGCConcurrency(ctx) + concurrency, err = s.gcWorker.loadGCConcurrencyWithDefault() c.Assert(err, IsNil) - c.Assert(concurrency, Equals, 1) // the value just set. + c.Assert(concurrency, Equals, gcMinConcurrency) - err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCConcurrency, "-2") // invalid option - c.Assert(err, NotNil) - concurrency, err = s.gcWorker.getGCConcurrency(ctx) + err = s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(-1)) + c.Assert(err, IsNil) + concurrency, err = s.gcWorker.loadGCConcurrencyWithDefault() c.Assert(err, IsNil) - c.Assert(concurrency, Equals, 1) // the previous value. + c.Assert(concurrency, Equals, gcMinConcurrency) - err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCConcurrency, strconv.Itoa(1000000)) // invalid option - c.Assert(err, NotNil) - concurrency, err = s.gcWorker.getGCConcurrency(ctx) + err = s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(1000000)) + c.Assert(err, IsNil) + concurrency, err = s.gcWorker.loadGCConcurrencyWithDefault() c.Assert(err, IsNil) - c.Assert(concurrency, Equals, 1) // the previous value. + c.Assert(concurrency, Equals, gcMaxConcurrency) // Change GC enable status. s.oracle.AddOffset(time.Minute * 40) - err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCEnable, variable.BoolOff) + err = s.gcWorker.saveValueToSysTable(gcEnableKey, booleanFalse) c.Assert(err, IsNil) ok, _, err = s.gcWorker.prepare() c.Assert(err, IsNil) c.Assert(ok, IsFalse) - err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCEnable, variable.BoolOn) + err = s.gcWorker.saveValueToSysTable(gcEnableKey, booleanTrue) c.Assert(err, IsNil) ok, _, err = s.gcWorker.prepare() c.Assert(err, IsNil) c.Assert(ok, IsTrue) // Check gc life time small than min. - err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCLifetime, time.Minute.String()) - c.Assert(err, NotNil) // one minute is too short. - - t = time.Minute * 10 - err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCLifetime, t.String()) + s.oracle.AddOffset(time.Minute * 40) + err = s.gcWorker.saveDuration(gcLifeTimeKey, time.Minute) c.Assert(err, IsNil) - str, err := se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCLifetime) + ok, _, err = s.gcWorker.prepare() c.Assert(err, IsNil) - lifeTime, err := strToDuration(str) + c.Assert(ok, IsTrue) + lifeTime, err := s.gcWorker.loadDuration(gcLifeTimeKey) c.Assert(err, IsNil) - c.Assert(*lifeTime, Equals, t) + c.Assert(*lifeTime, Equals, gcMinLifeTime) s.oracle.AddOffset(time.Minute * 40) - t = time.Minute * 30 - err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCLifetime, t.String()) + err = s.gcWorker.saveDuration(gcLifeTimeKey, time.Minute*30) c.Assert(err, IsNil) ok, _, err = s.gcWorker.prepare() c.Assert(err, IsNil) c.Assert(ok, IsTrue) - str, err = se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCLifetime) + lifeTime, err = s.gcWorker.loadDuration(gcLifeTimeKey) c.Assert(err, IsNil) - lifeTime, err = strToDuration(str) - c.Assert(err, IsNil) - c.Assert(*lifeTime, Equals, t) + c.Assert(*lifeTime, Equals, 30*time.Minute) + // Change auto concurrency + err = s.gcWorker.saveValueToSysTable(gcAutoConcurrencyKey, booleanFalse) + c.Assert(err, IsNil) + useAutoConcurrency, err := s.gcWorker.checkUseAutoConcurrency() + c.Assert(err, IsNil) + c.Assert(useAutoConcurrency, IsFalse) + err = s.gcWorker.saveValueToSysTable(gcAutoConcurrencyKey, booleanTrue) + c.Assert(err, IsNil) + useAutoConcurrency, err = s.gcWorker.checkUseAutoConcurrency() + c.Assert(err, IsNil) + c.Assert(useAutoConcurrency, IsTrue) } func (s *testGCWorkerSuite) TestDoGCForOneRegion(c *C) { @@ -414,19 +405,24 @@ func (s *testGCWorkerSuite) TestDoGCForOneRegion(c *C) { } func (s *testGCWorkerSuite) TestGetGCConcurrency(c *C) { + // Pick a concurrency that doesn't equal to the number of stores. + concurrencyConfig := 25 + c.Assert(concurrencyConfig, Not(Equals), len(s.cluster.GetAllStores())) + err := s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(concurrencyConfig)) + c.Assert(err, IsNil) - // Explicitly set the concurrency to -1 (AUTO) and then verify that - // The getGCConcurrency returns the number equal to the number of stores. - - se := createSession(s.gcWorker.store) - defer se.Close() + ctx := context.Background() - err := se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCConcurrency, "-1") + err = s.gcWorker.saveValueToSysTable(gcAutoConcurrencyKey, booleanFalse) c.Assert(err, IsNil) + concurrency, err := s.gcWorker.getGCConcurrency(ctx) + c.Assert(err, IsNil) + c.Assert(concurrency, Equals, concurrencyConfig) - ctx := context.Background() - concurrency, err1 := s.gcWorker.getGCConcurrency(ctx) - c.Assert(err1, IsNil) + err = s.gcWorker.saveValueToSysTable(gcAutoConcurrencyKey, booleanTrue) + c.Assert(err, IsNil) + concurrency, err = s.gcWorker.getGCConcurrency(ctx) + c.Assert(err, IsNil) c.Assert(concurrency, Equals, len(s.cluster.GetAllStores())) } @@ -437,84 +433,75 @@ func (s *testGCWorkerSuite) TestDoGC(c *C) { gcSafePointCacheInterval = 1 p := s.createGCProbe(c, "k1") - err = s.gcWorker.doGC(ctx, s.mustAllocTs(c), 2) // gcDefaultConcurrency + err = s.gcWorker.doGC(ctx, s.mustAllocTs(c), gcDefaultConcurrency) c.Assert(err, IsNil) s.checkCollected(c, p) p = s.createGCProbe(c, "k1") - err = s.gcWorker.doGC(ctx, s.mustAllocTs(c), 1) // gcMinConcurrency + err = s.gcWorker.doGC(ctx, s.mustAllocTs(c), gcMinConcurrency) c.Assert(err, IsNil) s.checkCollected(c, p) p = s.createGCProbe(c, "k1") - err = s.gcWorker.doGC(ctx, s.mustAllocTs(c), 128) // gcMaxConcurrency + err = s.gcWorker.doGC(ctx, s.mustAllocTs(c), gcMaxConcurrency) c.Assert(err, IsNil) s.checkCollected(c, p) } func (s *testGCWorkerSuite) TestCheckGCMode(c *C) { - - se := createSession(s.gcWorker.store) - defer se.Close() - useDistributedGC, err := s.gcWorker.checkUseDistributedGC() c.Assert(err, IsNil) c.Assert(useDistributedGC, Equals, true) // Now the row must be set to the default value. - str, err := se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCMode) + str, err := s.gcWorker.loadValueFromSysTable(gcModeKey) c.Assert(err, IsNil) - c.Assert(str, Equals, "DISTRIBUTED") + c.Assert(str, Equals, gcModeDistributed) - err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCMode, "cenTraL") + err = s.gcWorker.saveValueToSysTable(gcModeKey, gcModeCentral) c.Assert(err, IsNil) useDistributedGC, err = s.gcWorker.checkUseDistributedGC() c.Assert(err, IsNil) c.Assert(useDistributedGC, Equals, false) - err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCMode, "distribuTed") + err = s.gcWorker.saveValueToSysTable(gcModeKey, gcModeDistributed) c.Assert(err, IsNil) useDistributedGC, err = s.gcWorker.checkUseDistributedGC() c.Assert(err, IsNil) c.Assert(useDistributedGC, Equals, true) - err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCMode, "invalid_mode") - c.Assert(err, NotNil) // won't change the value - str, err = se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCMode) + err = s.gcWorker.saveValueToSysTable(gcModeKey, "invalid_mode") c.Assert(err, IsNil) - c.Assert(str, Equals, "DISTRIBUTED") // keeps previous value. - + useDistributedGC, err = s.gcWorker.checkUseDistributedGC() + c.Assert(err, IsNil) + c.Assert(useDistributedGC, Equals, true) } func (s *testGCWorkerSuite) TestCheckScanLockMode(c *C) { - - se := createSession(s.gcWorker.store) - defer se.Close() - usePhysical, err := s.gcWorker.checkUsePhysicalScanLock() c.Assert(err, IsNil) - c.Assert(usePhysical, Equals, true) + c.Assert(usePhysical, Equals, gcScanLockModeDefault == gcScanLockModePhysical) // Now the row must be set to the default value. - str, err := se.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCScanLockMode) + str, err := s.gcWorker.loadValueFromSysTable(gcScanLockModeKey) c.Assert(err, IsNil) - c.Assert(str, Equals, "PHYSICAL") + c.Assert(str, Equals, gcScanLockModeDefault) - err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCScanLockMode, "legaCY") + err = s.gcWorker.saveValueToSysTable(gcScanLockModeKey, gcScanLockModeLegacy) c.Assert(err, IsNil) usePhysical, err = s.gcWorker.checkUsePhysicalScanLock() c.Assert(err, IsNil) c.Assert(usePhysical, Equals, false) - err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCScanLockMode, "phySICAL") + err = s.gcWorker.saveValueToSysTable(gcScanLockModeKey, gcScanLockModePhysical) c.Assert(err, IsNil) usePhysical, err = s.gcWorker.checkUsePhysicalScanLock() c.Assert(err, IsNil) c.Assert(usePhysical, Equals, true) - err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCScanLockMode, "invalid_mode") - c.Assert(err, NotNil) + err = s.gcWorker.saveValueToSysTable(gcScanLockModeKey, "invalid_mode") + c.Assert(err, IsNil) usePhysical, err = s.gcWorker.checkUsePhysicalScanLock() c.Assert(err, IsNil) - c.Assert(usePhysical, Equals, true) // unchanged + c.Assert(usePhysical, Equals, false) } func (s *testGCWorkerSuite) TestNeedsGCOperationForStore(c *C) { @@ -758,18 +745,14 @@ func (c *testGCWorkerClient) SendRequest(ctx context.Context, addr string, req * func (s *testGCWorkerSuite) TestLeaderTick(c *C) { gcSafePointCacheInterval = 0 - veryLong := time.Minute * 10 * 10 // gcDefaultLifeTime * 10 + veryLong := gcDefaultLifeTime * 10 // Avoid failing at interval check. `lastFinish` is checked by os time. s.gcWorker.lastFinish = time.Now().Add(-veryLong) // Use central mode to do this test. - - se := createSession(s.gcWorker.store) - defer se.Close() - - err := se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCMode, "CENTRAL") + err := s.gcWorker.saveValueToSysTable(gcModeKey, gcModeCentral) c.Assert(err, IsNil) p := s.createGCProbe(c, "k1") - s.oracle.AddOffset(time.Minute * 10 * 2) // gcDefaultLifeTime * 2 + s.oracle.AddOffset(gcDefaultLifeTime * 2) // Skip if GC is running. s.gcWorker.gcIsRunning = true @@ -782,12 +765,12 @@ func (s *testGCWorkerSuite) TestLeaderTick(c *C) { c.Assert(err, IsNil) // Skip if prepare failed (disabling GC will make prepare returns ok = false). - err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCEnable, "OFF") + err = s.gcWorker.saveValueToSysTable(gcEnableKey, booleanFalse) c.Assert(err, IsNil) err = s.gcWorker.leaderTick(context.Background()) c.Assert(err, IsNil) s.checkNotCollected(c, p) - err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCEnable, "ON") + err = s.gcWorker.saveValueToSysTable(gcEnableKey, booleanTrue) c.Assert(err, IsNil) // Reset GC last run time err = s.gcWorker.saveTime(gcLastRunTimeKey, oracle.GetTimeFromTS(s.mustAllocTs(c)).Add(-veryLong)) @@ -822,7 +805,7 @@ func (s *testGCWorkerSuite) TestLeaderTick(c *C) { c.Assert(err, IsNil) s.gcWorker.lastFinish = time.Now().Add(-veryLong) p = s.createGCProbe(c, "k1") - s.oracle.AddOffset(time.Minute * 10 * 2) // gcDefaultLifeTime * 2 + s.oracle.AddOffset(gcDefaultLifeTime * 2) err = s.gcWorker.leaderTick(context.Background()) c.Assert(err, IsNil) @@ -914,10 +897,7 @@ func (s *testGCWorkerSuite) TestRunGCJob(c *C) { c.Assert(err, NotNil) // Test central mode - se := createSession(s.gcWorker.store) - defer se.Close() - - err = se.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCMode, "CENTRAL") + err = s.gcWorker.saveValueToSysTable(gcModeKey, gcModeCentral) c.Assert(err, IsNil) useDistributedGC, err = s.gcWorker.checkUseDistributedGC() c.Assert(err, IsNil) diff --git a/util/gcutil/gcutil.go b/util/gcutil/gcutil.go index 6e1d9fdc93bc7..a90a527b28be5 100644 --- a/util/gcutil/gcutil.go +++ b/util/gcutil/gcutil.go @@ -26,7 +26,6 @@ import ( const ( selectVariableValueSQL = `SELECT HIGH_PRIORITY variable_value FROM mysql.tidb WHERE variable_name='%s'` - insertVariableValueSQL = `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s') ON DUPLICATE KEY UPDATE variable_value = '%[2]s', comment = '%[3]s'` ) // CheckGCEnable is use to check whether GC is enable. From 2d45af34ca1fd4290c89fde082fb1891309e0c06 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Mon, 4 Jan 2021 08:39:51 -0700 Subject: [PATCH 10/15] Add tests for TypeDuration, min for TiKVGCRunInterval --- session/session_test.go | 8 ++++++++ sessionctx/variable/sysvar.go | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/session/session_test.go b/session/session_test.go index 969f571ace345..c92ef1cdebb53 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -3899,4 +3899,12 @@ func (s *testSessionSerialSuite) TestTiKVSystemVars(c *C) { result = tk.MustQuery("SELECT @@tikv_gc_concurrency;") result.Check(testkit.Rows("-1")) // because auto_concurrency is turned on it takes precedence + _, err := tk.Exec("SET GLOBAL tikv_gc_run_interval = '9m'") // too small + c.Assert(err.Error(), Equals, "[variable:1232]Incorrect argument type to variable 'tikv_gc_run_interval'") + + tk.MustExec("SET GLOBAL tikv_gc_run_interval = '700000000000ns'") // specified in ns, also valid + + _, err = tk.Exec("SET GLOBAL tikv_gc_run_interval = '11mins'") + c.Assert(err.Error(), Equals, "[variable:1232]Incorrect argument type to variable 'tikv_gc_run_interval'") // wrong format + } diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index c77c8f95cd8b9..629e944195571 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1217,7 +1217,7 @@ var defaultSysVars = []*SysVar{ /* tikv gc metrics */ {Scope: ScopeGlobal, Name: TiKVGCEnable, Value: BoolOn, Type: TypeBool}, - {Scope: ScopeGlobal, Name: TiKVGCRunInterval, Value: "10m0s", Type: TypeDuration, MinValue: 0, MaxValue: math.MaxInt64}, + {Scope: ScopeGlobal, Name: TiKVGCRunInterval, Value: "10m0s", Type: TypeDuration, MinValue: 600000000000, MaxValue: math.MaxInt64}, /* min: 600s in nanoseconds */ {Scope: ScopeGlobal, Name: TiKVGCLifetime, Value: "10m0s", Type: TypeDuration, MinValue: int64(time.Minute * 10), MaxValue: math.MaxInt64}, {Scope: ScopeGlobal, Name: TiKVGCConcurrency, Value: "-1", Type: TypeInt, MinValue: 1, MaxValue: 128, AllowAutoValue: true}, {Scope: ScopeGlobal, Name: TiKVGCMode, Value: "DISTRIBUTED", Type: TypeEnum, PossibleValues: []string{"DISTRIBUTED", "CENTRAL"}}, From 96cc6366c28967d96bead67520edcd3ca82fb34d Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Mon, 4 Jan 2021 23:10:20 -0700 Subject: [PATCH 11/15] Rename variables tikv_ -> tidb_ --- session/session.go | 40 ++++++++++++++++++++------------ session/session_test.go | 24 +++++++++---------- sessionctx/variable/sysvar.go | 12 +++++----- sessionctx/variable/tidb_vars.go | 24 +++++++++---------- util/gcutil/gcutil.go | 6 ++--- 5 files changed, 58 insertions(+), 48 deletions(-) diff --git a/session/session.go b/session/session.go index 37aeb52da47c1..8b3feeeb04dd2 100644 --- a/session/session.go +++ b/session/session.go @@ -99,13 +99,22 @@ var ( ) var gcVariableComments = map[string]string{ - variable.TiKVGCRunInterval: "GC run interval, at least 10m, in Go format.", - variable.TiKVGCLifetime: "All versions within life time will not be collected by GC, at least 10m, in Go format.", - variable.TiKVGCConcurrency: "How many goroutines used to do GC parallel, [1, 128], default 2", - variable.TiKVGCEnable: "Current GC enable status", - variable.TiKVGCMode: "Mode of GC, \"central\" or \"distributed\"", + variable.TiDBGCRunInterval: "GC run interval, at least 10m, in Go format.", + variable.TiDBGCLifetime: "All versions within life time will not be collected by GC, at least 10m, in Go format.", + variable.TiDBGCConcurrency: "How many goroutines used to do GC parallel, [1, 128], default 2", + variable.TiDBGCEnable: "Current GC enable status", + variable.TiDBGCMode: "Mode of GC, \"central\" or \"distributed\"", tiKVGCAutoConcurrency: "Let TiDB pick the concurrency automatically. If set false, tikv_gc_concurrency will be used", - variable.TiKVGCScanLockMode: "Mode of scanning locks, \"physical\" or \"legacy\"", + variable.TiDBGCScanLockMode: "Mode of scanning locks, \"physical\" or \"legacy\"", +} + +var gcVariableMap = map[string]string{ + variable.TiDBGCRunInterval: "tikv_gc_run_interval", + variable.TiDBGCLifetime: "tikv_gc_life_time", + variable.TiDBGCConcurrency: "tikv_gc_concurrency", + variable.TiDBGCEnable: "tikv_gc_enable", + variable.TiDBGCMode: "tikv_gc_mode", + variable.TiDBGCScanLockMode: "tikv_gc_scan_lock_mode", } // Session context, it is consistent with the lifecycle of a client connection. @@ -1092,7 +1101,7 @@ func escapeUserString(str string) string { // for backwards compatibility. Validation has already been performed. func (s *session) SetTiKVGlobalSysVar(name, val string) error { switch name { - case variable.TiKVGCConcurrency: + case variable.TiDBGCConcurrency: autoConcurrency := "false" if val == "-1" { autoConcurrency = "true" @@ -1104,10 +1113,10 @@ func (s *session) SetTiKVGlobalSysVar(name, val string) error { return err } fallthrough - case variable.TiKVGCEnable, variable.TiKVGCRunInterval, variable.TiKVGCLifetime, variable.TiKVGCMode, variable.TiKVGCScanLockMode: + case variable.TiDBGCEnable, variable.TiDBGCRunInterval, variable.TiDBGCLifetime, variable.TiDBGCMode, variable.TiDBGCScanLockMode: val = onOffToTrueFalse(val) sql := fmt.Sprintf(`INSERT INTO mysql.tidb (variable_name, variable_value, comment) VALUES ('%[1]s', '%[2]s', '%[3]s') - ON DUPLICATE KEY UPDATE variable_value = '%[2]s'`, name, escapeUserString(val), gcVariableComments[name]) + ON DUPLICATE KEY UPDATE variable_value = '%[2]s'`, gcVariableMap[name], escapeUserString(val), gcVariableComments[name]) _, _, err := s.ExecRestrictedSQL(sql) return err } @@ -1140,7 +1149,7 @@ func onOffToTrueFalse(str string) string { // to read from mysql.tidb for backwards compatibility. func (s *session) GetTiKVGlobalSysVar(name, val string) (string, error) { switch name { - case variable.TiKVGCConcurrency: + case variable.TiDBGCConcurrency: // Check if autoconcurrency is set sql := fmt.Sprintf(`SELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME='%s';`, tiKVGCAutoConcurrency) autoConcurrencyVal, err := s.getExecRet(s, sql) @@ -1148,9 +1157,9 @@ func (s *session) GetTiKVGlobalSysVar(name, val string) (string, error) { return "-1", nil // convention for "AUTO" } fallthrough - case variable.TiKVGCEnable, variable.TiKVGCRunInterval, variable.TiKVGCLifetime, - variable.TiKVGCMode, variable.TiKVGCScanLockMode: - sql := fmt.Sprintf(`SELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME='%s';`, name) + case variable.TiDBGCEnable, variable.TiDBGCRunInterval, variable.TiDBGCLifetime, + variable.TiDBGCMode, variable.TiDBGCScanLockMode: + sql := fmt.Sprintf(`SELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME='%s';`, gcVariableMap[name]) tblValue, err := s.getExecRet(s, sql) if err != nil { return val, nil // mysql.tidb value does not exist. @@ -1163,17 +1172,18 @@ func (s *session) GetTiKVGlobalSysVar(name, val string) (string, error) { logutil.Logger(context.Background()).Warn("restoring sysvar value since validating mysql.tidb value failed", zap.Error(err), zap.String("name", name), + zap.String("tblName", gcVariableMap[name]), zap.String("tblValue", tblValue), zap.String("restoredValue", val)) sql := fmt.Sprintf(`REPLACE INTO mysql.tidb (variable_name, variable_value, comment) - VALUES ('%s', '%s', '%s')`, name, escapeUserString(val), gcVariableComments[name]) + VALUES ('%s', '%s', '%s')`, gcVariableMap[name], escapeUserString(val), gcVariableComments[name]) _, _, err = s.ExecRestrictedSQL(sql) return val, err } if validatedVal != val { // The sysvar value is out of sync. sql := fmt.Sprintf(`REPLACE %s.%s VALUES ('%s', '%s');`, - mysql.SystemDB, mysql.GlobalVariablesTable, name, escapeUserString(validatedVal)) + mysql.SystemDB, mysql.GlobalVariablesTable, gcVariableMap[name], escapeUserString(validatedVal)) _, _, err = s.ExecRestrictedSQL(sql) return validatedVal, err } diff --git a/session/session_test.go b/session/session_test.go index 0c5abe0f2d5e9..1c0b2fe69620f 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -3869,43 +3869,43 @@ func (s *testSessionSerialSuite) TestIssue21943(c *C) { func (s *testSessionSerialSuite) TestTiKVSystemVars(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) - result := tk.MustQuery("SHOW GLOBAL VARIABLES LIKE 'tikv_gc_enable'") // default is on from the sysvar - result.Check(testkit.Rows("tikv_gc_enable ON")) + result := tk.MustQuery("SHOW GLOBAL VARIABLES LIKE 'tidb_gc_enable'") // default is on from the sysvar + result.Check(testkit.Rows("tidb_gc_enable ON")) result = tk.MustQuery("SELECT variable_value FROM mysql.tidb WHERE variable_name = 'tikv_gc_enable'") result.Check(testkit.Rows()) // but no value in the table (yet) because the value has not been set and the GC has never been run // update will set a value in the table - tk.MustExec("SET GLOBAL tikv_gc_enable = 1") + tk.MustExec("SET GLOBAL tidb_gc_enable = 1") result = tk.MustQuery("SELECT variable_value FROM mysql.tidb WHERE variable_name = 'tikv_gc_enable'") result.Check(testkit.Rows("true")) tk.MustExec("UPDATE mysql.tidb SET variable_value = 'false' WHERE variable_name='tikv_gc_enable'") - result = tk.MustQuery("SELECT @@tikv_gc_enable;") + result = tk.MustQuery("SELECT @@tidb_gc_enable;") result.Check(testkit.Rows("0")) // reads from mysql.tidb value and changes to false - tk.MustExec("SET GLOBAL tikv_gc_concurrency = -1") // sets auto concurrency and concurrency + tk.MustExec("SET GLOBAL tidb_gc_concurrency = -1") // sets auto concurrency and concurrency result = tk.MustQuery("SELECT variable_value FROM mysql.tidb WHERE variable_name = 'tikv_gc_auto_concurrency'") result.Check(testkit.Rows("true")) result = tk.MustQuery("SELECT variable_value FROM mysql.tidb WHERE variable_name = 'tikv_gc_concurrency'") result.Check(testkit.Rows("-1")) - tk.MustExec("SET GLOBAL tikv_gc_concurrency = 5") // sets auto concurrency and concurrency + tk.MustExec("SET GLOBAL tidb_gc_concurrency = 5") // sets auto concurrency and concurrency result = tk.MustQuery("SELECT variable_value FROM mysql.tidb WHERE variable_name = 'tikv_gc_auto_concurrency'") result.Check(testkit.Rows("false")) result = tk.MustQuery("SELECT variable_value FROM mysql.tidb WHERE variable_name = 'tikv_gc_concurrency'") result.Check(testkit.Rows("5")) tk.MustExec("UPDATE mysql.tidb SET variable_value = 'true' WHERE variable_name='tikv_gc_auto_concurrency'") - result = tk.MustQuery("SELECT @@tikv_gc_concurrency;") + result = tk.MustQuery("SELECT @@tidb_gc_concurrency;") result.Check(testkit.Rows("-1")) // because auto_concurrency is turned on it takes precedence - _, err := tk.Exec("SET GLOBAL tikv_gc_run_interval = '9m'") // too small - c.Assert(err.Error(), Equals, "[variable:1232]Incorrect argument type to variable 'tikv_gc_run_interval'") + _, err := tk.Exec("SET GLOBAL tidb_gc_run_interval = '9m'") // too small + c.Assert(err.Error(), Equals, "[variable:1232]Incorrect argument type to variable 'tidb_gc_run_interval'") - tk.MustExec("SET GLOBAL tikv_gc_run_interval = '700000000000ns'") // specified in ns, also valid + tk.MustExec("SET GLOBAL tidb_gc_run_interval = '700000000000ns'") // specified in ns, also valid - _, err = tk.Exec("SET GLOBAL tikv_gc_run_interval = '11mins'") - c.Assert(err.Error(), Equals, "[variable:1232]Incorrect argument type to variable 'tikv_gc_run_interval'") // wrong format + _, err = tk.Exec("SET GLOBAL tidb_gc_run_interval = '11mins'") + c.Assert(err.Error(), Equals, "[variable:1232]Incorrect argument type to variable 'tidb_gc_run_interval'") // wrong format } diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 629e944195571..b5d8ca66074d9 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1216,12 +1216,12 @@ var defaultSysVars = []*SysVar{ {Scope: ScopeGlobal | ScopeSession, Name: TiDBTrackAggregateMemoryUsage, Value: BoolToOnOff(DefTiDBTrackAggregateMemoryUsage), Type: TypeBool}, /* tikv gc metrics */ - {Scope: ScopeGlobal, Name: TiKVGCEnable, Value: BoolOn, Type: TypeBool}, - {Scope: ScopeGlobal, Name: TiKVGCRunInterval, Value: "10m0s", Type: TypeDuration, MinValue: 600000000000, MaxValue: math.MaxInt64}, /* min: 600s in nanoseconds */ - {Scope: ScopeGlobal, Name: TiKVGCLifetime, Value: "10m0s", Type: TypeDuration, MinValue: int64(time.Minute * 10), MaxValue: math.MaxInt64}, - {Scope: ScopeGlobal, Name: TiKVGCConcurrency, Value: "-1", Type: TypeInt, MinValue: 1, MaxValue: 128, AllowAutoValue: true}, - {Scope: ScopeGlobal, Name: TiKVGCMode, Value: "DISTRIBUTED", Type: TypeEnum, PossibleValues: []string{"DISTRIBUTED", "CENTRAL"}}, - {Scope: ScopeGlobal, Name: TiKVGCScanLockMode, Value: "PHYSICAL", Type: TypeEnum, PossibleValues: []string{"PHYSICAL", "LEGACY"}}, + {Scope: ScopeGlobal, Name: TiDBGCEnable, Value: BoolOn, Type: TypeBool}, + {Scope: ScopeGlobal, Name: TiDBGCRunInterval, Value: "10m0s", Type: TypeDuration, MinValue: 600000000000, MaxValue: math.MaxInt64}, /* min: 600s in nanoseconds */ + {Scope: ScopeGlobal, Name: TiDBGCLifetime, Value: "10m0s", Type: TypeDuration, MinValue: int64(time.Minute * 10), MaxValue: math.MaxInt64}, + {Scope: ScopeGlobal, Name: TiDBGCConcurrency, Value: "-1", Type: TypeInt, MinValue: 1, MaxValue: 128, AllowAutoValue: true}, + {Scope: ScopeGlobal, Name: TiDBGCMode, Value: "DISTRIBUTED", Type: TypeEnum, PossibleValues: []string{"DISTRIBUTED", "CENTRAL"}}, + {Scope: ScopeGlobal, Name: TiDBGCScanLockMode, Value: "PHYSICAL", Type: TypeEnum, PossibleValues: []string{"PHYSICAL", "LEGACY"}}, } // SynonymsSysVariables is synonyms of system variables. diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 5295cbd47e536..6f2c9cf48cf40 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -521,18 +521,18 @@ const ( // TiDB vars that have only global scope const ( - // TiKVGCEnable description - TiKVGCEnable = "tikv_gc_enable" - // TiKVGCRunInterval description - TiKVGCRunInterval = "tikv_gc_run_interval" - // TiKVGCLifetime description - TiKVGCLifetime = "tikv_gc_life_time" - // TiKVGCConcurrency description - TiKVGCConcurrency = "tikv_gc_concurrency" - // TiKVGCMode description - TiKVGCMode = "tikv_gc_mode" - // TiKVGCScanLockMode description - TiKVGCScanLockMode = "tikv_gc_scan_lock_mode" + // TiDBGCEnable description + TiDBGCEnable = "tidb_gc_enable" + // TiDBGCRunInterval description + TiDBGCRunInterval = "tidb_gc_run_interval" + // TiDBGCLifetime description + TiDBGCLifetime = "tidb_gc_life_time" + // TiDBGCConcurrency description + TiDBGCConcurrency = "tidb_gc_concurrency" + // TiDBGCMode description + TiDBGCMode = "tidb_gc_mode" + // TiDBGCScanLockMode description + TiDBGCScanLockMode = "tidb_gc_scan_lock_mode" ) // Default TiDB system variable values. diff --git a/util/gcutil/gcutil.go b/util/gcutil/gcutil.go index a90a527b28be5..9bf6aabe06fda 100644 --- a/util/gcutil/gcutil.go +++ b/util/gcutil/gcutil.go @@ -30,7 +30,7 @@ const ( // CheckGCEnable is use to check whether GC is enable. func CheckGCEnable(ctx sessionctx.Context) (enable bool, err error) { - val, err := ctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiKVGCEnable) + val, err := ctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBGCEnable) if err != nil { return false, errors.Trace(err) } @@ -39,12 +39,12 @@ func CheckGCEnable(ctx sessionctx.Context) (enable bool, err error) { // DisableGC will disable GC enable variable. func DisableGC(ctx sessionctx.Context) error { - return ctx.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCEnable, variable.BoolOff) + return ctx.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiDBGCEnable, variable.BoolOff) } // EnableGC will enable GC enable variable. func EnableGC(ctx sessionctx.Context) error { - return ctx.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiKVGCEnable, variable.BoolOn) + return ctx.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiDBGCEnable, variable.BoolOn) } // ValidateSnapshot checks that the newly set snapshot time is after GC safe point time. From e1ec48bb2786cd1443215374e2987a68be335b43 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Tue, 5 Jan 2021 00:32:38 -0700 Subject: [PATCH 12/15] Update sessionctx/variable/sysvar.go Co-authored-by: Lei Zhao --- sessionctx/variable/sysvar.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index b5d8ca66074d9..a99f3e4757934 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1217,7 +1217,7 @@ var defaultSysVars = []*SysVar{ /* tikv gc metrics */ {Scope: ScopeGlobal, Name: TiDBGCEnable, Value: BoolOn, Type: TypeBool}, - {Scope: ScopeGlobal, Name: TiDBGCRunInterval, Value: "10m0s", Type: TypeDuration, MinValue: 600000000000, MaxValue: math.MaxInt64}, /* min: 600s in nanoseconds */ + {Scope: ScopeGlobal, Name: TiDBGCRunInterval, Value: "10m0s", Type: TypeDuration, MinValue: int64(time.Minute * 10), MaxValue: math.MaxInt64}, {Scope: ScopeGlobal, Name: TiDBGCLifetime, Value: "10m0s", Type: TypeDuration, MinValue: int64(time.Minute * 10), MaxValue: math.MaxInt64}, {Scope: ScopeGlobal, Name: TiDBGCConcurrency, Value: "-1", Type: TypeInt, MinValue: 1, MaxValue: 128, AllowAutoValue: true}, {Scope: ScopeGlobal, Name: TiDBGCMode, Value: "DISTRIBUTED", Type: TypeEnum, PossibleValues: []string{"DISTRIBUTED", "CENTRAL"}}, From cf5585c326c53ee73332db052981ae2b8f9f379d Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Tue, 5 Jan 2021 11:06:12 -0700 Subject: [PATCH 13/15] Rename tikv functions --- session/session.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/session/session.go b/session/session.go index 8b3feeeb04dd2..ccaee5e489042 100644 --- a/session/session.go +++ b/session/session.go @@ -1023,7 +1023,7 @@ func (s *session) GetAllSysVars() (map[string]string, error) { ret := make(map[string]string, len(rows)) for _, r := range rows { k, v := r.GetString(0), r.GetString(1) - if v, err = s.GetTiKVGlobalSysVar(k, v); err != nil { + if v, err = s.checkForTiDBTableValue(k, v); err != nil { ret[k] = v } } @@ -1053,7 +1053,7 @@ func (s *session) GetGlobalSysVar(name string) (string, error) { return "", err } // Update mysql.tidb values if required - return s.GetTiKVGlobalSysVar(name, sysVar) + return s.checkForTiDBTableValue(name, sysVar) } // SetGlobalSysVar implements GlobalVarAccessor.SetGlobalSysVar interface. @@ -1081,7 +1081,7 @@ func (s *session) SetGlobalSysVar(name, value string) error { } name = strings.ToLower(name) // update mysql.tidb if required. - if err = s.SetTiKVGlobalSysVar(name, sVal); err != nil { + if err = s.setTiDBTableValue(name, sVal); err != nil { return err } variable.CheckDeprecationSetSystemVar(s.sessionVars, name) @@ -1097,9 +1097,9 @@ func escapeUserString(str string) string { return strings.ReplaceAll(str, `'`, `\'`) } -// SetTiKVGlobalSysVar handles tikv_* sysvars which need to update mysql.tidb +// setTiDBTableValue handles tikv_* sysvars which need to update mysql.tidb // for backwards compatibility. Validation has already been performed. -func (s *session) SetTiKVGlobalSysVar(name, val string) error { +func (s *session) setTiDBTableValue(name, val string) error { switch name { case variable.TiDBGCConcurrency: autoConcurrency := "false" @@ -1145,9 +1145,9 @@ func onOffToTrueFalse(str string) string { return str } -// GetTiKVGlobalSysVar handles tikv_* sysvars which need +// checkForTiDBTableValue handles tikv_* sysvars which need // to read from mysql.tidb for backwards compatibility. -func (s *session) GetTiKVGlobalSysVar(name, val string) (string, error) { +func (s *session) checkForTiDBTableValue(name, val string) (string, error) { switch name { case variable.TiDBGCConcurrency: // Check if autoconcurrency is set From b89cb9e5c2105bb2c794d6fdf8a5ac78abe813d6 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Tue, 5 Jan 2021 22:06:16 -0700 Subject: [PATCH 14/15] Address reviewer feedback --- session/session.go | 110 ++++++++++++++++++++++++--------------------- 1 file changed, 59 insertions(+), 51 deletions(-) diff --git a/session/session.go b/session/session.go index ccaee5e489042..82d88151c9aac 100644 --- a/session/session.go +++ b/session/session.go @@ -1009,6 +1009,14 @@ func (s *session) getExecRet(ctx sessionctx.Context, sql string) (string, error) return value, nil } +func (s *session) varFromTiDBTable(name string) bool { + switch name { + case variable.TiDBGCConcurrency, variable.TiDBGCEnable, variable.TiDBGCRunInterval, variable.TiDBGCLifetime, variable.TiDBGCMode, variable.TiDBGCScanLockMode: + return true + } + return false +} + // GetAllSysVars implements GlobalVarAccessor.GetAllSysVars interface. func (s *session) GetAllSysVars() (map[string]string, error) { if s.Value(sessionctx.Initing) != nil { @@ -1023,7 +1031,11 @@ func (s *session) GetAllSysVars() (map[string]string, error) { ret := make(map[string]string, len(rows)) for _, r := range rows { k, v := r.GetString(0), r.GetString(1) - if v, err = s.checkForTiDBTableValue(k, v); err != nil { + if s.varFromTiDBTable(k) { + if v, err = s.getTiDBTableValue(k, v); err != nil { + ret[k] = v + } + } else { ret[k] = v } } @@ -1052,8 +1064,11 @@ func (s *session) GetGlobalSysVar(name string) (string, error) { } return "", err } - // Update mysql.tidb values if required - return s.checkForTiDBTableValue(name, sysVar) + // Fetch mysql.tidb values if required + if s.varFromTiDBTable(name) { + return s.getTiDBTableValue(name, sysVar) + } + return sysVar, nil } // SetGlobalSysVar implements GlobalVarAccessor.SetGlobalSysVar interface. @@ -1081,8 +1096,10 @@ func (s *session) SetGlobalSysVar(name, value string) error { } name = strings.ToLower(name) // update mysql.tidb if required. - if err = s.setTiDBTableValue(name, sVal); err != nil { - return err + if s.varFromTiDBTable(name) { + if err = s.setTiDBTableValue(name, sVal); err != nil { + return err + } } variable.CheckDeprecationSetSystemVar(s.sessionVars, name) sql := fmt.Sprintf(`REPLACE %s.%s VALUES ('%s', '%s');`, @@ -1100,8 +1117,7 @@ func escapeUserString(str string) string { // setTiDBTableValue handles tikv_* sysvars which need to update mysql.tidb // for backwards compatibility. Validation has already been performed. func (s *session) setTiDBTableValue(name, val string) error { - switch name { - case variable.TiDBGCConcurrency: + if name == variable.TiDBGCConcurrency { autoConcurrency := "false" if val == "-1" { autoConcurrency = "true" @@ -1112,15 +1128,12 @@ func (s *session) setTiDBTableValue(name, val string) error { if err != nil { return err } - fallthrough - case variable.TiDBGCEnable, variable.TiDBGCRunInterval, variable.TiDBGCLifetime, variable.TiDBGCMode, variable.TiDBGCScanLockMode: - val = onOffToTrueFalse(val) - sql := fmt.Sprintf(`INSERT INTO mysql.tidb (variable_name, variable_value, comment) VALUES ('%[1]s', '%[2]s', '%[3]s') - ON DUPLICATE KEY UPDATE variable_value = '%[2]s'`, gcVariableMap[name], escapeUserString(val), gcVariableComments[name]) - _, _, err := s.ExecRestrictedSQL(sql) - return err } - return nil // not a TiKV sysVar + val = onOffToTrueFalse(val) + sql := fmt.Sprintf(`INSERT INTO mysql.tidb (variable_name, variable_value, comment) VALUES ('%[1]s', '%[2]s', '%[3]s') + ON DUPLICATE KEY UPDATE variable_value = '%[2]s'`, gcVariableMap[name], escapeUserString(val), gcVariableComments[name]) + _, _, err := s.ExecRestrictedSQL(sql) + return err } // In mysql.tidb the convention has been to store the string value "true"/"false", @@ -1145,51 +1158,46 @@ func onOffToTrueFalse(str string) string { return str } -// checkForTiDBTableValue handles tikv_* sysvars which need +// getTiDBTableValue handles tikv_* sysvars which need // to read from mysql.tidb for backwards compatibility. -func (s *session) checkForTiDBTableValue(name, val string) (string, error) { - switch name { - case variable.TiDBGCConcurrency: +func (s *session) getTiDBTableValue(name, val string) (string, error) { + if name == variable.TiDBGCConcurrency { // Check if autoconcurrency is set sql := fmt.Sprintf(`SELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME='%s';`, tiKVGCAutoConcurrency) autoConcurrencyVal, err := s.getExecRet(s, sql) if err == nil && strings.EqualFold(autoConcurrencyVal, "true") { return "-1", nil // convention for "AUTO" } - fallthrough - case variable.TiDBGCEnable, variable.TiDBGCRunInterval, variable.TiDBGCLifetime, - variable.TiDBGCMode, variable.TiDBGCScanLockMode: - sql := fmt.Sprintf(`SELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME='%s';`, gcVariableMap[name]) - tblValue, err := s.getExecRet(s, sql) - if err != nil { - return val, nil // mysql.tidb value does not exist. - } - // Run validation on the tblValue. This will return an error if it can't be validated, - // but will also make it more consistent: disTribuTeD -> DISTRIBUTED etc - tblValue = trueFalseToOnOff(tblValue) - validatedVal, err := variable.ValidateSetSystemVar(s.sessionVars, name, tblValue, variable.ScopeGlobal) - if err != nil { - logutil.Logger(context.Background()).Warn("restoring sysvar value since validating mysql.tidb value failed", - zap.Error(err), - zap.String("name", name), - zap.String("tblName", gcVariableMap[name]), - zap.String("tblValue", tblValue), - zap.String("restoredValue", val)) - sql := fmt.Sprintf(`REPLACE INTO mysql.tidb (variable_name, variable_value, comment) + } + sql := fmt.Sprintf(`SELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME='%s';`, gcVariableMap[name]) + tblValue, err := s.getExecRet(s, sql) + if err != nil { + return val, nil // mysql.tidb value does not exist. + } + // Run validation on the tblValue. This will return an error if it can't be validated, + // but will also make it more consistent: disTribuTeD -> DISTRIBUTED etc + tblValue = trueFalseToOnOff(tblValue) + validatedVal, err := variable.ValidateSetSystemVar(s.sessionVars, name, tblValue, variable.ScopeGlobal) + if err != nil { + logutil.Logger(context.Background()).Warn("restoring sysvar value since validating mysql.tidb value failed", + zap.Error(err), + zap.String("name", name), + zap.String("tblName", gcVariableMap[name]), + zap.String("tblValue", tblValue), + zap.String("restoredValue", val)) + sql := fmt.Sprintf(`REPLACE INTO mysql.tidb (variable_name, variable_value, comment) VALUES ('%s', '%s', '%s')`, gcVariableMap[name], escapeUserString(val), gcVariableComments[name]) - _, _, err = s.ExecRestrictedSQL(sql) - return val, err - } - if validatedVal != val { - // The sysvar value is out of sync. - sql := fmt.Sprintf(`REPLACE %s.%s VALUES ('%s', '%s');`, - mysql.SystemDB, mysql.GlobalVariablesTable, gcVariableMap[name], escapeUserString(validatedVal)) - _, _, err = s.ExecRestrictedSQL(sql) - return validatedVal, err - } - return validatedVal, nil + _, _, err = s.ExecRestrictedSQL(sql) + return val, err + } + if validatedVal != val { + // The sysvar value is out of sync. + sql := fmt.Sprintf(`REPLACE %s.%s VALUES ('%s', '%s');`, + mysql.SystemDB, mysql.GlobalVariablesTable, gcVariableMap[name], escapeUserString(validatedVal)) + _, _, err = s.ExecRestrictedSQL(sql) + return validatedVal, err } - return val, nil // not a TiKV sysVar + return validatedVal, nil } func (s *session) ensureFullGlobalStats() error { From 9bbf7fe2ab87026ad91b2c238ad6945bd7c71c7d Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 6 Jan 2021 12:38:05 -0700 Subject: [PATCH 15/15] Remove GC Mode since it is no longer effective --- session/session.go | 4 +--- sessionctx/variable/sysvar.go | 1 - sessionctx/variable/tidb_vars.go | 12 +++++------- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/session/session.go b/session/session.go index 82d88151c9aac..964dec04e6257 100644 --- a/session/session.go +++ b/session/session.go @@ -103,7 +103,6 @@ var gcVariableComments = map[string]string{ variable.TiDBGCLifetime: "All versions within life time will not be collected by GC, at least 10m, in Go format.", variable.TiDBGCConcurrency: "How many goroutines used to do GC parallel, [1, 128], default 2", variable.TiDBGCEnable: "Current GC enable status", - variable.TiDBGCMode: "Mode of GC, \"central\" or \"distributed\"", tiKVGCAutoConcurrency: "Let TiDB pick the concurrency automatically. If set false, tikv_gc_concurrency will be used", variable.TiDBGCScanLockMode: "Mode of scanning locks, \"physical\" or \"legacy\"", } @@ -113,7 +112,6 @@ var gcVariableMap = map[string]string{ variable.TiDBGCLifetime: "tikv_gc_life_time", variable.TiDBGCConcurrency: "tikv_gc_concurrency", variable.TiDBGCEnable: "tikv_gc_enable", - variable.TiDBGCMode: "tikv_gc_mode", variable.TiDBGCScanLockMode: "tikv_gc_scan_lock_mode", } @@ -1011,7 +1009,7 @@ func (s *session) getExecRet(ctx sessionctx.Context, sql string) (string, error) func (s *session) varFromTiDBTable(name string) bool { switch name { - case variable.TiDBGCConcurrency, variable.TiDBGCEnable, variable.TiDBGCRunInterval, variable.TiDBGCLifetime, variable.TiDBGCMode, variable.TiDBGCScanLockMode: + case variable.TiDBGCConcurrency, variable.TiDBGCEnable, variable.TiDBGCRunInterval, variable.TiDBGCLifetime, variable.TiDBGCScanLockMode: return true } return false diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index a99f3e4757934..bbfd712fda3e4 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1220,7 +1220,6 @@ var defaultSysVars = []*SysVar{ {Scope: ScopeGlobal, Name: TiDBGCRunInterval, Value: "10m0s", Type: TypeDuration, MinValue: int64(time.Minute * 10), MaxValue: math.MaxInt64}, {Scope: ScopeGlobal, Name: TiDBGCLifetime, Value: "10m0s", Type: TypeDuration, MinValue: int64(time.Minute * 10), MaxValue: math.MaxInt64}, {Scope: ScopeGlobal, Name: TiDBGCConcurrency, Value: "-1", Type: TypeInt, MinValue: 1, MaxValue: 128, AllowAutoValue: true}, - {Scope: ScopeGlobal, Name: TiDBGCMode, Value: "DISTRIBUTED", Type: TypeEnum, PossibleValues: []string{"DISTRIBUTED", "CENTRAL"}}, {Scope: ScopeGlobal, Name: TiDBGCScanLockMode, Value: "PHYSICAL", Type: TypeEnum, PossibleValues: []string{"PHYSICAL", "LEGACY"}}, } diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 6f2c9cf48cf40..45139b4e8e46d 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -521,17 +521,15 @@ const ( // TiDB vars that have only global scope const ( - // TiDBGCEnable description + // TiDBGCEnable turns garbage collection on or OFF TiDBGCEnable = "tidb_gc_enable" - // TiDBGCRunInterval description + // TiDBGCRunInterval sets the interval that GC runs TiDBGCRunInterval = "tidb_gc_run_interval" - // TiDBGCLifetime description + // TiDBGCLifetime sets the retention window of older versions TiDBGCLifetime = "tidb_gc_life_time" - // TiDBGCConcurrency description + // TiDBGCConcurrency sets the concurrency of garbage collection. -1 = AUTO value TiDBGCConcurrency = "tidb_gc_concurrency" - // TiDBGCMode description - TiDBGCMode = "tidb_gc_mode" - // TiDBGCScanLockMode description + // TiDBGCScanLockMode enables the green GC feature (default) TiDBGCScanLockMode = "tidb_gc_scan_lock_mode" )