From 0f10bef470f45bd862b38592f51ecb92a540896a Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Mon, 17 May 2021 08:13:39 -0600 Subject: [PATCH] domain, session: Add new sysvarcache to replace global values cache (#24359) --- cmd/explaintest/main.go | 3 - ddl/db_change_test.go | 2 - domain/domain.go | 73 ++++++++- domain/global_vars_cache.go | 135 ---------------- domain/global_vars_cache_test.go | 221 -------------------------- domain/sysvar_cache.go | 167 +++++++++++++++++++ executor/executor_test.go | 2 - executor/seqtest/seq_executor_test.go | 2 - infoschema/tables_test.go | 6 - metrics/domain.go | 9 ++ metrics/metrics.go | 1 + planner/core/prepare_test.go | 3 - session/session.go | 88 +++++----- session/session_test.go | 6 - sessionctx/variable/session.go | 9 ++ sessionctx/variable/sysvar.go | 54 +++++-- 16 files changed, 343 insertions(+), 438 deletions(-) delete mode 100644 domain/global_vars_cache.go delete mode 100644 domain/global_vars_cache_test.go create mode 100644 domain/sysvar_cache.go diff --git a/cmd/explaintest/main.go b/cmd/explaintest/main.go index fa5265f7af871..a85c8ce82dd3c 100644 --- a/cmd/explaintest/main.go +++ b/cmd/explaintest/main.go @@ -30,7 +30,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/parser/ast" - "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util/logutil" @@ -663,8 +662,6 @@ func main() { log.Fatal(fmt.Sprintf("%s failed", sql), zap.Error(err)) } } - // Wait global variables to reload. - time.Sleep(domain.GlobalVariableCacheExpiry) if _, err = mdb.Exec("set sql_mode='STRICT_TRANS_TABLES'"); err != nil { log.Fatal("set sql_mode='STRICT_TRANS_TABLES' failed", zap.Error(err)) diff --git a/ddl/db_change_test.go b/ddl/db_change_test.go index 7c3a0f9ad970f..041f35c7734a8 100644 --- a/ddl/db_change_test.go +++ b/ddl/db_change_test.go @@ -1063,7 +1063,6 @@ func (s *testStateChangeSuite) TestParallelAddGeneratedColumnAndAlterModifyColum _, err = s.se.Execute(context.Background(), "set global tidb_enable_change_column_type = 0") c.Assert(err, IsNil) }() - domain.GetDomain(s.se).GetGlobalVarsCache().Disable() sql1 := "ALTER TABLE t ADD COLUMN f INT GENERATED ALWAYS AS(a+1);" sql2 := "ALTER TABLE t MODIFY COLUMN a tinyint;" @@ -1083,7 +1082,6 @@ func (s *testStateChangeSuite) TestParallelAlterModifyColumnAndAddPK(c *C) { _, err = s.se.Execute(context.Background(), "set global tidb_enable_change_column_type = 0") c.Assert(err, IsNil) }() - domain.GetDomain(s.se).GetGlobalVarsCache().Disable() sql1 := "ALTER TABLE t ADD PRIMARY KEY (b) NONCLUSTERED;" sql2 := "ALTER TABLE t MODIFY COLUMN b tinyint;" diff --git a/domain/domain.go b/domain/domain.go index e6ea3d1e2d949..44f6df1aa9086 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -79,7 +79,7 @@ type Domain struct { sysSessionPool *sessionPool exit chan struct{} etcdClient *clientv3.Client - gvc GlobalVariableCache + sysVarCache SysVarCache // replaces GlobalVariableCache slowQuery *topNSlowQueries expensiveQueryHandle *expensivequery.Handle wg sync.WaitGroup @@ -900,6 +900,55 @@ func (do *Domain) LoadPrivilegeLoop(ctx sessionctx.Context) error { return nil } +// LoadSysVarCacheLoop create a goroutine loads sysvar cache in a loop, +// it should be called only once in BootstrapSession. +func (do *Domain) LoadSysVarCacheLoop(ctx sessionctx.Context) error { + err := do.sysVarCache.RebuildSysVarCache(ctx) + if err != nil { + return err + } + var watchCh clientv3.WatchChan + duration := 30 * time.Second + if do.etcdClient != nil { + watchCh = do.etcdClient.Watch(context.Background(), sysVarCacheKey) + } + do.wg.Add(1) + go func() { + defer func() { + do.wg.Done() + logutil.BgLogger().Info("LoadSysVarCacheLoop exited.") + util.Recover(metrics.LabelDomain, "LoadSysVarCacheLoop", nil, false) + }() + var count int + for { + ok := true + select { + case <-do.exit: + return + case _, ok = <-watchCh: + case <-time.After(duration): + } + if !ok { + logutil.BgLogger().Error("LoadSysVarCacheLoop loop watch channel closed") + watchCh = do.etcdClient.Watch(context.Background(), sysVarCacheKey) + count++ + if count > 10 { + time.Sleep(time.Duration(count) * time.Second) + } + continue + } + count = 0 + logutil.BgLogger().Debug("Rebuilding sysvar cache from etcd watch event.") + err := do.sysVarCache.RebuildSysVarCache(ctx) + metrics.LoadSysVarCacheCounter.WithLabelValues(metrics.RetLabel(err)).Inc() + if err != nil { + logutil.BgLogger().Error("LoadSysVarCacheLoop failed", zap.Error(err)) + } + } + }() + return nil +} + // PrivilegeHandle returns the MySQLPrivilege. func (do *Domain) PrivilegeHandle() *privileges.Handle { return do.privHandle @@ -1278,7 +1327,10 @@ func (do *Domain) ExpensiveQueryHandle() *expensivequery.Handle { return do.expensiveQueryHandle } -const privilegeKey = "/tidb/privilege" +const ( + privilegeKey = "/tidb/privilege" + sysVarCacheKey = "/tidb/sysvars" +) // NotifyUpdatePrivilege updates privilege key in etcd, TiDB client that watches // the key will get notification. @@ -1300,6 +1352,23 @@ func (do *Domain) NotifyUpdatePrivilege(ctx sessionctx.Context) { } } +// NotifyUpdateSysVarCache updates the sysvar cache key in etcd, which other TiDB +// clients are subscribed to for updates. For the caller, the cache is also built +// synchronously so that the effect is immediate. +func (do *Domain) NotifyUpdateSysVarCache(ctx sessionctx.Context) { + if do.etcdClient != nil { + row := do.etcdClient.KV + _, err := row.Put(context.Background(), sysVarCacheKey, "") + if err != nil { + logutil.BgLogger().Warn("notify update sysvar cache failed", zap.Error(err)) + } + } + // update locally + if err := do.sysVarCache.RebuildSysVarCache(ctx); err != nil { + logutil.BgLogger().Error("rebuilding sysvar cache failed", zap.Error(err)) + } +} + // ServerID gets serverID. func (do *Domain) ServerID() uint64 { return atomic.LoadUint64(&do.serverID) diff --git a/domain/global_vars_cache.go b/domain/global_vars_cache.go deleted file mode 100644 index 52aa12a5ac955..0000000000000 --- a/domain/global_vars_cache.go +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright 2018 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package domain - -import ( - "fmt" - "sync" - "time" - - "github.com/pingcap/parser/ast" - "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/util/chunk" - "github.com/pingcap/tidb/util/logutil" - "github.com/pingcap/tidb/util/stmtsummary" - "go.uber.org/zap" - "golang.org/x/sync/singleflight" -) - -// GlobalVariableCache caches global variables. -type GlobalVariableCache struct { - sync.RWMutex - lastModify time.Time - rows []chunk.Row - fields []*ast.ResultField - - // Unit test may like to disable it. - disable bool - SingleFight singleflight.Group -} - -// GlobalVariableCacheExpiry is the global variable cache TTL. -const GlobalVariableCacheExpiry = 2 * time.Second - -// Update updates the global variable cache. -func (gvc *GlobalVariableCache) Update(rows []chunk.Row, fields []*ast.ResultField) { - gvc.Lock() - gvc.lastModify = time.Now() - gvc.rows = rows - gvc.fields = fields - gvc.Unlock() - - checkEnableServerGlobalVar(rows) -} - -// Get gets the global variables from cache. -func (gvc *GlobalVariableCache) Get() (succ bool, rows []chunk.Row, fields []*ast.ResultField) { - gvc.RLock() - defer gvc.RUnlock() - if time.Since(gvc.lastModify) < GlobalVariableCacheExpiry { - succ, rows, fields = !gvc.disable, gvc.rows, gvc.fields - return - } - succ = false - return -} - -type loadResult struct { - rows []chunk.Row - fields []*ast.ResultField -} - -// LoadGlobalVariables will load from global cache first, loadFn will be executed if cache is not valid -func (gvc *GlobalVariableCache) LoadGlobalVariables(loadFn func() ([]chunk.Row, []*ast.ResultField, error)) ([]chunk.Row, []*ast.ResultField, error) { - succ, rows, fields := gvc.Get() - if succ { - return rows, fields, nil - } - fn := func() (interface{}, error) { - resRows, resFields, loadErr := loadFn() - if loadErr != nil { - return nil, loadErr - } - gvc.Update(resRows, resFields) - return &loadResult{resRows, resFields}, nil - } - res, err, _ := gvc.SingleFight.Do("loadGlobalVariable", fn) - if err != nil { - return nil, nil, err - } - loadRes := res.(*loadResult) - return loadRes.rows, loadRes.fields, nil -} - -// Disable disables the global variable cache, used in test only. -func (gvc *GlobalVariableCache) Disable() { - gvc.Lock() - defer gvc.Unlock() - gvc.disable = true -} - -// checkEnableServerGlobalVar processes variables that acts in server and global level. -func checkEnableServerGlobalVar(rows []chunk.Row) { - for _, row := range rows { - sVal := "" - if !row.IsNull(1) { - sVal = row.GetString(1) - } - var err error - switch row.GetString(0) { - case variable.TiDBEnableStmtSummary: - err = stmtsummary.StmtSummaryByDigestMap.SetEnabled(sVal, false) - case variable.TiDBStmtSummaryInternalQuery: - err = stmtsummary.StmtSummaryByDigestMap.SetEnabledInternalQuery(sVal, false) - case variable.TiDBStmtSummaryRefreshInterval: - err = stmtsummary.StmtSummaryByDigestMap.SetRefreshInterval(sVal, false) - case variable.TiDBStmtSummaryHistorySize: - err = stmtsummary.StmtSummaryByDigestMap.SetHistorySize(sVal, false) - case variable.TiDBStmtSummaryMaxStmtCount: - err = stmtsummary.StmtSummaryByDigestMap.SetMaxStmtCount(sVal, false) - case variable.TiDBStmtSummaryMaxSQLLength: - err = stmtsummary.StmtSummaryByDigestMap.SetMaxSQLLength(sVal, false) - case variable.TiDBCapturePlanBaseline: - variable.CapturePlanBaseline.Set(sVal, false) - } - if err != nil { - logutil.BgLogger().Error(fmt.Sprintf("load global variable %s error", row.GetString(0)), zap.Error(err)) - } - } -} - -// GetGlobalVarsCache gets the global variable cache. -func (do *Domain) GetGlobalVarsCache() *GlobalVariableCache { - return &do.gvc -} diff --git a/domain/global_vars_cache_test.go b/domain/global_vars_cache_test.go deleted file mode 100644 index 7358d709986af..0000000000000 --- a/domain/global_vars_cache_test.go +++ /dev/null @@ -1,221 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package domain - -import ( - "sync" - "sync/atomic" - "time" - - . "github.com/pingcap/check" - "github.com/pingcap/parser/ast" - "github.com/pingcap/parser/charset" - "github.com/pingcap/parser/model" - "github.com/pingcap/parser/mysql" - "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/store/mockstore" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/chunk" - "github.com/pingcap/tidb/util/stmtsummary" - "github.com/pingcap/tidb/util/testleak" -) - -var _ = SerialSuites(&testGVCSuite{}) - -type testGVCSuite struct{} - -func (gvcSuite *testGVCSuite) TestSimple(c *C) { - defer testleak.AfterTest(c)() - testleak.BeforeTest() - - store, err := mockstore.NewMockStore() - c.Assert(err, IsNil) - defer func() { - err := store.Close() - c.Assert(err, IsNil) - }() - ddlLease := 50 * time.Millisecond - dom := NewDomain(store, ddlLease, 0, 0, mockFactory) - err = dom.Init(ddlLease, sysMockFactory) - c.Assert(err, IsNil) - defer dom.Close() - - // Get empty global vars cache. - gvc := dom.GetGlobalVarsCache() - succ, rows, fields := gvc.Get() - c.Assert(succ, IsFalse) - c.Assert(rows, IsNil) - c.Assert(fields, IsNil) - // Get a variable from global vars cache. - rf := getResultField("c", 1, 0) - rf1 := getResultField("c1", 2, 1) - ft := &types.FieldType{ - Tp: mysql.TypeString, - Charset: charset.CharsetBin, - Collate: charset.CollationBin, - } - ft1 := &types.FieldType{ - Tp: mysql.TypeString, - Charset: charset.CharsetBin, - Collate: charset.CollationBin, - } - ck := chunk.NewChunkWithCapacity([]*types.FieldType{ft, ft1}, 1024) - ck.AppendString(0, "variable1") - ck.AppendString(1, "value1") - row := ck.GetRow(0) - gvc.Update([]chunk.Row{row}, []*ast.ResultField{rf, rf1}) - succ, rows, fields = gvc.Get() - c.Assert(succ, IsTrue) - c.Assert(rows[0], Equals, row) - c.Assert(fields, DeepEquals, []*ast.ResultField{rf, rf1}) - // Disable the cache. - gvc.Disable() - succ, rows, fields = gvc.Get() - c.Assert(succ, IsFalse) - c.Assert(rows[0], Equals, row) - c.Assert(fields, DeepEquals, []*ast.ResultField{rf, rf1}) -} - -func getResultField(colName string, id, offset int) *ast.ResultField { - return &ast.ResultField{ - Column: &model.ColumnInfo{ - Name: model.NewCIStr(colName), - ID: int64(id), - Offset: offset, - FieldType: types.FieldType{ - Tp: mysql.TypeString, - Charset: charset.CharsetUTF8, - Collate: charset.CollationUTF8, - }, - }, - TableAsName: model.NewCIStr("tbl"), - DBName: model.NewCIStr("test"), - } -} - -func (gvcSuite *testGVCSuite) TestConcurrentOneFlight(c *C) { - defer testleak.AfterTest(c)() - testleak.BeforeTest() - gvc := &GlobalVariableCache{} - succ, rows, fields := gvc.Get() - c.Assert(succ, IsFalse) - c.Assert(rows, IsNil) - c.Assert(fields, IsNil) - - // Get a variable from global vars cache. - rf := getResultField("c", 1, 0) - rf1 := getResultField("c1", 2, 1) - ft := &types.FieldType{ - Tp: mysql.TypeString, - Charset: charset.CharsetBin, - Collate: charset.CollationBin, - } - ft1 := &types.FieldType{ - Tp: mysql.TypeString, - Charset: charset.CharsetBin, - Collate: charset.CollationBin, - } - ckLow := chunk.NewChunkWithCapacity([]*types.FieldType{ft, ft1}, 1024) - val := "fromStorage" - val1 := "fromStorage1" - ckLow.AppendString(0, val) - ckLow.AppendString(1, val1) - - // Let cache become invalid, and try concurrent load - counter := int32(0) - waitToStart := new(sync.WaitGroup) - waitToStart.Add(1) - gvc.lastModify = time.Now().Add(time.Duration(-10) * time.Second) - loadFunc := func() ([]chunk.Row, []*ast.ResultField, error) { - time.Sleep(100 * time.Millisecond) - atomic.AddInt32(&counter, 1) - return []chunk.Row{ckLow.GetRow(0)}, []*ast.ResultField{rf, rf1}, nil - } - wg := new(sync.WaitGroup) - worker := 100 - resArray := make([]loadResult, worker) - for i := 0; i < worker; i++ { - wg.Add(1) - go func(idx int) { - defer wg.Done() - waitToStart.Wait() - resRow, resField, _ := gvc.LoadGlobalVariables(loadFunc) - resArray[idx].rows = resRow - resArray[idx].fields = resField - }(i) - } - waitToStart.Done() - wg.Wait() - succ, rows, fields = gvc.Get() - c.Assert(counter, Equals, int32(1)) - c.Assert(resArray[0].rows[0].GetString(0), Equals, val) - c.Assert(resArray[0].rows[0].GetString(1), Equals, val1) - for i := 0; i < worker; i++ { - c.Assert(resArray[0].rows[0], Equals, resArray[i].rows[0]) - c.Assert(resArray[i].rows[0].GetString(0), Equals, val) - c.Assert(resArray[i].rows[0].GetString(1), Equals, val1) - } - // Validate cache - c.Assert(succ, IsTrue) - c.Assert(rows[0], Equals, resArray[0].rows[0]) - c.Assert(fields, DeepEquals, []*ast.ResultField{rf, rf1}) -} - -func (gvcSuite *testGVCSuite) TestCheckEnableStmtSummary(c *C) { - defer testleak.AfterTest(c)() - testleak.BeforeTest() - - store, err := mockstore.NewMockStore() - c.Assert(err, IsNil) - defer func() { - err := store.Close() - c.Assert(err, IsNil) - }() - ddlLease := 50 * time.Millisecond - dom := NewDomain(store, ddlLease, 0, 0, mockFactory) - err = dom.Init(ddlLease, sysMockFactory) - c.Assert(err, IsNil) - defer dom.Close() - - gvc := dom.GetGlobalVarsCache() - - rf := getResultField("c", 1, 0) - rf1 := getResultField("c1", 2, 1) - ft := &types.FieldType{ - Tp: mysql.TypeString, - Charset: charset.CharsetBin, - Collate: charset.CollationBin, - } - ft1 := &types.FieldType{ - Tp: mysql.TypeString, - Charset: charset.CharsetBin, - Collate: charset.CollationBin, - } - - err = stmtsummary.StmtSummaryByDigestMap.SetEnabled("0", false) - c.Assert(err, IsNil) - ck := chunk.NewChunkWithCapacity([]*types.FieldType{ft, ft1}, 1024) - ck.AppendString(0, variable.TiDBEnableStmtSummary) - ck.AppendString(1, "1") - row := ck.GetRow(0) - gvc.Update([]chunk.Row{row}, []*ast.ResultField{rf, rf1}) - c.Assert(stmtsummary.StmtSummaryByDigestMap.Enabled(), Equals, true) - - ck = chunk.NewChunkWithCapacity([]*types.FieldType{ft, ft1}, 1024) - ck.AppendString(0, variable.TiDBEnableStmtSummary) - ck.AppendString(1, "0") - row = ck.GetRow(0) - gvc.Update([]chunk.Row{row}, []*ast.ResultField{rf, rf1}) - c.Assert(stmtsummary.StmtSummaryByDigestMap.Enabled(), Equals, false) -} diff --git a/domain/sysvar_cache.go b/domain/sysvar_cache.go new file mode 100644 index 0000000000000..23c9688ea2f81 --- /dev/null +++ b/domain/sysvar_cache.go @@ -0,0 +1,167 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package domain + +import ( + "context" + "fmt" + "sync" + + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/sqlexec" + "github.com/pingcap/tidb/util/stmtsummary" + "go.uber.org/zap" +) + +// The sysvar cache replaces the GlobalVariableCache. +// It is an improvement because it operates similar to privilege cache, +// where it caches for 5 minutes instead of 2 seconds, plus it listens on etcd +// for updates from other servers. + +// SysVarCache represents the cache of system variables broken up into session and global scope. +type SysVarCache struct { + sync.RWMutex + global map[string]string + session map[string]string +} + +// GetSysVarCache gets the global variable cache. +func (do *Domain) GetSysVarCache() *SysVarCache { + return &do.sysVarCache +} + +func (svc *SysVarCache) rebuildCacheIfNeeded(ctx sessionctx.Context) (err error) { + svc.RLock() + cacheNeedsRebuild := len(svc.session) == 0 || len(svc.global) == 0 + svc.RUnlock() + if cacheNeedsRebuild { + logutil.BgLogger().Warn("sysvar cache is empty, triggering rebuild") + if err = svc.RebuildSysVarCache(ctx); err != nil { + logutil.BgLogger().Error("rebuilding sysvar cache failed", zap.Error(err)) + } + } + return err +} + +// GetSessionCache gets a copy of the session sysvar cache. +// The intention is to copy it directly to the systems[] map +// on creating a new session. +func (svc *SysVarCache) GetSessionCache(ctx sessionctx.Context) (map[string]string, error) { + if err := svc.rebuildCacheIfNeeded(ctx); err != nil { + return nil, err + } + svc.RLock() + defer svc.RUnlock() + // Perform a deep copy since this will be assigned directly to the session + newMap := make(map[string]string, len(svc.session)) + for k, v := range svc.session { + newMap[k] = v + } + return newMap, nil +} + +// GetGlobalVar gets an individual global var from the sysvar cache. +func (svc *SysVarCache) GetGlobalVar(ctx sessionctx.Context, name string) (string, error) { + if err := svc.rebuildCacheIfNeeded(ctx); err != nil { + return "", err + } + svc.RLock() + defer svc.RUnlock() + + if val, ok := svc.global[name]; ok { + return val, nil + } + logutil.BgLogger().Warn("could not find key in global cache", zap.String("name", name)) + return "", variable.ErrUnknownSystemVar.GenWithStackByArgs(name) +} + +func (svc *SysVarCache) fetchTableValues(ctx sessionctx.Context) (map[string]string, error) { + tableContents := make(map[string]string) + // Copy all variables from the table to tableContents + exec := ctx.(sqlexec.RestrictedSQLExecutor) + stmt, err := exec.ParseWithParams(context.Background(), `SELECT variable_name, variable_value FROM mysql.global_variables`) + if err != nil { + return tableContents, err + } + rows, _, err := exec.ExecRestrictedStmt(context.TODO(), stmt) + if err != nil { + return nil, err + } + for _, row := range rows { + name := row.GetString(0) + val := row.GetString(1) + tableContents[name] = val + } + return tableContents, nil +} + +// RebuildSysVarCache rebuilds the sysvar cache both globally and for session vars. +// It needs to be called when sysvars are added or removed. +func (svc *SysVarCache) RebuildSysVarCache(ctx sessionctx.Context) error { + newSessionCache := make(map[string]string) + newGlobalCache := make(map[string]string) + tableContents, err := svc.fetchTableValues(ctx) + if err != nil { + return err + } + + for _, sv := range variable.GetSysVars() { + sVal := sv.Value + if _, ok := tableContents[sv.Name]; ok { + sVal = tableContents[sv.Name] + } + if sv.HasSessionScope() { + newSessionCache[sv.Name] = sVal + } + if sv.HasGlobalScope() { + newGlobalCache[sv.Name] = sVal + } + // Propagate any changes to the server scoped variables + checkEnableServerGlobalVar(sv.Name, sVal) + } + + logutil.BgLogger().Debug("rebuilding sysvar cache") + + svc.Lock() + defer svc.Unlock() + svc.session = newSessionCache + svc.global = newGlobalCache + return nil +} + +// checkEnableServerGlobalVar processes variables that acts in server and global level. +func checkEnableServerGlobalVar(name, sVal string) { + var err error + switch name { + case variable.TiDBEnableStmtSummary: + err = stmtsummary.StmtSummaryByDigestMap.SetEnabled(sVal, false) + case variable.TiDBStmtSummaryInternalQuery: + err = stmtsummary.StmtSummaryByDigestMap.SetEnabledInternalQuery(sVal, false) + case variable.TiDBStmtSummaryRefreshInterval: + err = stmtsummary.StmtSummaryByDigestMap.SetRefreshInterval(sVal, false) + case variable.TiDBStmtSummaryHistorySize: + err = stmtsummary.StmtSummaryByDigestMap.SetHistorySize(sVal, false) + case variable.TiDBStmtSummaryMaxStmtCount: + err = stmtsummary.StmtSummaryByDigestMap.SetMaxStmtCount(sVal, false) + case variable.TiDBStmtSummaryMaxSQLLength: + err = stmtsummary.StmtSummaryByDigestMap.SetMaxSQLLength(sVal, false) + case variable.TiDBCapturePlanBaseline: + variable.CapturePlanBaseline.Set(sVal, false) + } + if err != nil { + logutil.BgLogger().Error(fmt.Sprintf("load global variable %s error", name), zap.Error(err)) + } +} diff --git a/executor/executor_test.go b/executor/executor_test.go index 5e6b4490f5eb6..7fa0d7b0d10bd 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -2267,8 +2267,6 @@ func (s *testSuiteP2) TestSQLMode(c *C) { tk.MustExec("set sql_mode = 'STRICT_TRANS_TABLES'") tk.MustExec("set @@global.sql_mode = ''") - // Disable global variable cache, so load global session variable take effect immediate. - s.domain.GetGlobalVarsCache().Disable() tk2 := testkit.NewTestKit(c, s.store) tk2.MustExec("use test") tk2.MustExec("drop table if exists t2") diff --git a/executor/seqtest/seq_executor_test.go b/executor/seqtest/seq_executor_test.go index 061e09dcc1315..bcecfc8d52ad4 100644 --- a/executor/seqtest/seq_executor_test.go +++ b/executor/seqtest/seq_executor_test.go @@ -1473,8 +1473,6 @@ func (s *seqTestSuite) TestMaxDeltaSchemaCount(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") c.Assert(variable.GetMaxDeltaSchemaCount(), Equals, int64(variable.DefTiDBMaxDeltaSchemaCount)) - gvc := domain.GetDomain(tk.Se).GetGlobalVarsCache() - gvc.Disable() tk.MustExec("set @@global.tidb_max_delta_schema_count= -1") tk.MustQuery("show warnings;").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_max_delta_schema_count value: '-1'")) diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index ebe4a0620256f..1e5687928f3ad 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -965,8 +965,6 @@ func (s *testTableSuite) TestStmtSummaryTable(c *C) { tk.MustExec("set global tidb_enable_stmt_summary = 1") tk.MustQuery("select @@global.tidb_enable_stmt_summary").Check(testkit.Rows("1")) - // Invalidate the cache manually so that tidb_enable_stmt_summary works immediately. - s.dom.GetGlobalVarsCache().Disable() // Disable refreshing summary. tk.MustExec("set global tidb_stmt_summary_refresh_interval = 999999999") tk.MustQuery("select @@global.tidb_stmt_summary_refresh_interval").Check(testkit.Rows("999999999")) @@ -1209,8 +1207,6 @@ func (s *testClusterTableSuite) TestStmtSummaryHistoryTable(c *C) { tk.MustExec("set global tidb_enable_stmt_summary = 1") tk.MustQuery("select @@global.tidb_enable_stmt_summary").Check(testkit.Rows("1")) - // Invalidate the cache manually so that tidb_enable_stmt_summary works immediately. - s.dom.GetGlobalVarsCache().Disable() // Disable refreshing summary. tk.MustExec("set global tidb_stmt_summary_refresh_interval = 999999999") tk.MustQuery("select @@global.tidb_stmt_summary_refresh_interval").Check(testkit.Rows("999999999")) @@ -1266,8 +1262,6 @@ func (s *testTableSuite) TestStmtSummaryInternalQuery(c *C) { tk.MustExec("create global binding for select * from t where t.a = 1 using select * from t ignore index(k) where t.a = 1") tk.MustExec("set global tidb_enable_stmt_summary = 1") tk.MustQuery("select @@global.tidb_enable_stmt_summary").Check(testkit.Rows("1")) - // Invalidate the cache manually so that tidb_enable_stmt_summary works immediately. - s.dom.GetGlobalVarsCache().Disable() // Disable refreshing summary. tk.MustExec("set global tidb_stmt_summary_refresh_interval = 999999999") tk.MustQuery("select @@global.tidb_stmt_summary_refresh_interval").Check(testkit.Rows("999999999")) diff --git a/metrics/domain.go b/metrics/domain.go index f30dbd59e5d32..a05b25dd6a46a 100644 --- a/metrics/domain.go +++ b/metrics/domain.go @@ -60,6 +60,15 @@ var ( Help: "Counter of load privilege", }, []string{LblType}) + // LoadSysVarCacheCounter records the counter of loading sysvars + LoadSysVarCacheCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "domain", + Name: "load_sysvarcache_total", + Help: "Counter of load sysvar cache", + }, []string{LblType}) + SchemaValidatorStop = "stop" SchemaValidatorRestart = "restart" SchemaValidatorReset = "reset" diff --git a/metrics/metrics.go b/metrics/metrics.go index 4a879b5d5423c..542398e7bbdee 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -151,6 +151,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiFlashQueryTotalCounter) prometheus.MustRegister(SmallTxnWriteDuration) prometheus.MustRegister(TxnWriteThroughput) + prometheus.MustRegister(LoadSysVarCacheCounter) tikvmetrics.InitMetrics(TiDB, TiKVClient) tikvmetrics.RegisterMetrics() diff --git a/planner/core/prepare_test.go b/planner/core/prepare_test.go index cd43b3964d59b..d6bfe69f82b39 100644 --- a/planner/core/prepare_test.go +++ b/planner/core/prepare_test.go @@ -278,9 +278,6 @@ func (s *testPrepareSerialSuite) TestPrepareOverMaxPreparedStmtCount(c *C) { tk.MustExec("set @@global.max_prepared_stmt_count = 2") tk.MustQuery("select @@global.max_prepared_stmt_count").Check(testkit.Rows("2")) - // Disable global variable cache, so load global session variable take effect immediate. - dom.GetGlobalVarsCache().Disable() - // test close session to give up all prepared stmt tk.MustExec(`prepare stmt2 from "select 1"`) prePrepared = readGaugeInt(metrics.PreparedStmtGauge) diff --git a/session/session.go b/session/session.go index e1581d5ed4074..e13be2045e941 100644 --- a/session/session.go +++ b/session/session.go @@ -991,6 +991,7 @@ func (s *session) replaceTableValue(ctx context.Context, tblName string, varName return err } _, _, err = s.ExecRestrictedStmt(ctx, stmt) + domain.GetDomain(s).NotifyUpdateSysVarCache(s) return err } @@ -1011,16 +1012,27 @@ func (s *session) GetGlobalSysVar(name string) (string, error) { // When running bootstrap or upgrade, we should not access global storage. return "", nil } - sysVar, err := s.getTableValue(context.TODO(), mysql.GlobalVariablesTable, name) + + sv := variable.GetSysVar(name) + if sv == nil { + // It might be a recently unregistered sysvar. We should return unknown + // since GetSysVar is the canonical version, but we can update the cache + // so the next request doesn't attempt to load this. + logutil.BgLogger().Info("sysvar does not exist. sysvar cache may be stale", zap.String("name", name)) + return "", variable.ErrUnknownSystemVar.GenWithStackByArgs(name) + } + + sysVar, err := domain.GetDomain(s).GetSysVarCache().GetGlobalVar(s, name) if err != nil { - if errResultIsEmpty.Equal(err) { - sv := variable.GetSysVar(name) - if sv != nil { - return sv.Value, nil - } - return "", variable.ErrUnknownSystemVar.GenWithStackByArgs(name) + // The sysvar exists, but there is no cache entry yet. + // This might be because the sysvar was only recently registered. + // In which case it is safe to return the default, but we can also + // update the cache for the future. + logutil.BgLogger().Info("sysvar not in cache yet. sysvar cache may be stale", zap.String("name", name)) + sysVar, err = s.getTableValue(context.TODO(), mysql.GlobalVariablesTable, name) + if err != nil { + return sv.Value, nil } - return "", err } // Fetch mysql.tidb values if required if s.varFromTiDBTable(name) { @@ -1065,12 +1077,7 @@ func (s *session) updateGlobalSysVar(sv *variable.SysVar, value string) error { return err } } - stmt, err := s.ParseWithParams(context.TODO(), "REPLACE %n.%n VALUES (%?, %?)", mysql.SystemDB, mysql.GlobalVariablesTable, sv.Name, value) - if err != nil { - return err - } - _, _, err = s.ExecRestrictedStmt(context.TODO(), stmt) - return err + return s.replaceTableValue(context.TODO(), mysql.GlobalVariablesTable, sv.Name, value) } // setTiDBTableValue handles tikv_* sysvars which need to update mysql.tidb @@ -2330,13 +2337,18 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { } } + // Rebuild sysvar cache in a loop + err = dom.LoadSysVarCacheLoop(se) + if err != nil { + return nil, err + } + if len(cfg.Plugin.Load) > 0 { err := plugin.Init(context.Background(), plugin.Config{EtcdClient: dom.GetEtcdClient()}) if err != nil { return nil, err } } - se4, err := createSession(store) if err != nil { return nil, err @@ -2439,7 +2451,7 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) { // CreateSessionWithDomain creates a new Session and binds it with a Domain. // We need this because when we start DDL in Domain, the DDL need a session // to change some system tables. But at that time, we have been already in -// a lock context, which cause we can't call createSesion directly. +// a lock context, which cause we can't call createSession directly. func CreateSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, error) { s := &session{ store: store, @@ -2647,38 +2659,30 @@ func (s *session) loadCommonGlobalVariablesIfNeeded() error { return nil } - var err error - // Use GlobalVariableCache if TiDB just loaded global variables within 2 second ago. - // When a lot of connections connect to TiDB simultaneously, it can protect TiKV meta region from overload. - gvc := domain.GetDomain(s).GetGlobalVarsCache() - loadFunc := func() ([]chunk.Row, []*ast.ResultField, error) { - vars := append(make([]string, 0, len(builtinGlobalVariable)+len(variable.PluginVarNames)), builtinGlobalVariable...) - if len(variable.PluginVarNames) > 0 { - vars = append(vars, variable.PluginVarNames...) - } - - stmt, err := s.ParseWithParams(context.TODO(), "select HIGH_PRIORITY * from mysql.global_variables where variable_name in (%?) order by VARIABLE_NAME", vars) - if err != nil { - return nil, nil, errors.Trace(err) - } + vars.CommonGlobalLoaded = true - return s.ExecRestrictedStmt(context.TODO(), stmt) - } - rows, _, err := gvc.LoadGlobalVariables(loadFunc) + // Deep copy sessionvar cache + // Eventually this whole map will be applied to systems[], which is a MySQL behavior. + sessionCache, err := domain.GetDomain(s).GetSysVarCache().GetSessionCache(s) if err != nil { - logutil.BgLogger().Warn("failed to load global variables", - zap.Uint64("conn", s.sessionVars.ConnectionID), zap.Error(err)) return err } - vars.CommonGlobalLoaded = true - - for _, row := range rows { - varName := row.GetString(0) - varVal := row.GetString(1) + for _, varName := range builtinGlobalVariable { + // The item should be in the sessionCache, but due to a strange current behavior there are some Global-only + // vars that are in builtinGlobalVariable. For compatibility we need to fall back to the Global cache on these items. + // TODO: don't load these globals into the session! + var varVal string + var ok bool + if varVal, ok = sessionCache[varName]; !ok { + varVal, err = s.GetGlobalSysVar(varName) + if err != nil { + continue // skip variables that are not loaded. + } + } // `collation_server` is related to `character_set_server`, set `character_set_server` will also set `collation_server`. // We have to make sure we set the `collation_server` with right value. if _, ok := vars.GetSystemVar(varName); !ok || varName == variable.CollationServer { - err = vars.SetSystemVar(varName, varVal) + err = vars.SetSystemVarWithRelaxedValidation(varName, varVal) if err != nil { return err } @@ -2693,8 +2697,6 @@ func (s *session) loadCommonGlobalVariablesIfNeeded() error { } } } - - vars.CommonGlobalLoaded = true return nil } diff --git a/session/session_test.go b/session/session_test.go index b7cfecc9c5f4c..9845e757470f0 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -213,7 +213,6 @@ func (s *testSessionSuiteBase) SetUpSuite(c *C) { var err error s.dom, err = session.BootstrapSession(s.store) c.Assert(err, IsNil) - s.dom.GetGlobalVarsCache().Disable() } func (s *testSessionSuiteBase) TearDownSuite(c *C) { @@ -639,7 +638,6 @@ func (s *testSessionSuite) TestGlobalVarAccessor(c *C) { c.Assert(v, Equals, varValue2) // For issue 10955, make sure the new session load `max_execution_time` into sessionVars. - s.dom.GetGlobalVarsCache().Disable() tk1.MustExec("set @@global.max_execution_time = 100") tk2 := testkit.NewTestKitWithInit(c, s.store) c.Assert(tk2.Se.GetSessionVars().MaxExecutionTime, Equals, uint64(100)) @@ -2618,8 +2616,6 @@ func (s *testSessionSuite) TestSetGlobalTZ(c *C) { tk.MustQuery("show variables like 'time_zone'").Check(testkit.Rows("time_zone +08:00")) - // Disable global variable cache, so load global session variable take effect immediate. - s.dom.GetGlobalVarsCache().Disable() tk1 := testkit.NewTestKitWithInit(c, s.store) tk1.MustQuery("show variables like 'time_zone'").Check(testkit.Rows("time_zone +00:00")) } @@ -2761,8 +2757,6 @@ func (s *testSessionSuite3) TestEnablePartition(c *C) { tk.MustExec("set tidb_enable_list_partition=on") tk.MustQuery("show variables like 'tidb_enable_list_partition'").Check(testkit.Rows("tidb_enable_list_partition ON")) - // Disable global variable cache, so load global session variable take effect immediate. - s.dom.GetGlobalVarsCache().Disable() tk1 := testkit.NewTestKitWithInit(c, s.store) tk1.MustQuery("show variables like 'tidb_enable_table_partition'").Check(testkit.Rows("tidb_enable_table_partition ON")) } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 0c6c74d90a26d..c474e7905fa7b 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1421,6 +1421,15 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { return sv.SetSessionFromHook(s, val) } +// SetSystemVarWithRelaxedValidation sets the value of a system variable for session scope. +// Validation functions are called, but scope validation is skipped. +// Errors are not expected to be returned because this could cause upgrade issues. +func (s *SessionVars) SetSystemVarWithRelaxedValidation(name string, val string) error { + sv := GetSysVar(name) + val = sv.ValidateWithRelaxedValidation(s, val, ScopeSession) + return sv.SetSessionFromHook(s, val) +} + // GetReadableTxnMode returns the session variable TxnMode but rewrites it to "OPTIMISTIC" when it's empty. func (s *SessionVars) GetReadableTxnMode() string { txnMode := s.TxnMode diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 98518fe4af0f0..99c3da8233d68 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -188,6 +188,10 @@ func (sv *SysVar) HasGlobalScope() bool { // Validate checks if system variable satisfies specific restriction. func (sv *SysVar) Validate(vars *SessionVars, value string, scope ScopeFlag) (string, error) { + // Check that the scope is correct first. + if err := sv.validateScope(scope); err != nil { + return value, err + } // Normalize the value and apply validation based on type. // i.e. TypeBool converts 1/on/ON to ON. normalizedValue, err := sv.validateFromType(vars, value, scope) @@ -203,17 +207,6 @@ func (sv *SysVar) Validate(vars *SessionVars, value string, scope ScopeFlag) (st // validateFromType provides automatic validation based on the SysVar's type func (sv *SysVar) validateFromType(vars *SessionVars, value string, scope ScopeFlag) (string, error) { - // Check that the scope is correct and return the appropriate error message. - if sv.ReadOnly || sv.Scope == ScopeNone { - return value, ErrIncorrectScope.FastGenByArgs(sv.Name, "read only") - } - if scope == ScopeGlobal && !sv.HasGlobalScope() { - return value, errLocalVariable.FastGenByArgs(sv.Name) - } - if scope == ScopeSession && !sv.HasSessionScope() { - return value, errGlobalVariable.FastGenByArgs(sv.Name) - } - // The string "DEFAULT" is a special keyword in MySQL, which restores // the compiled sysvar value. In which case we can skip further validation. if strings.EqualFold(value, "DEFAULT") { @@ -245,6 +238,37 @@ func (sv *SysVar) validateFromType(vars *SessionVars, value string, scope ScopeF return value, nil // typeString } +func (sv *SysVar) validateScope(scope ScopeFlag) error { + if sv.ReadOnly || sv.Scope == ScopeNone { + return ErrIncorrectScope.FastGenByArgs(sv.Name, "read only") + } + if scope == ScopeGlobal && !sv.HasGlobalScope() { + return errLocalVariable.FastGenByArgs(sv.Name) + } + if scope == ScopeSession && !sv.HasSessionScope() { + return errGlobalVariable.FastGenByArgs(sv.Name) + } + return nil +} + +// ValidateWithRelaxedValidation normalizes values but can not return errors. +// Normalization+validation needs to be applied when reading values because older versions of TiDB +// may be less sophisticated in normalizing values. But errors should be caught and handled, +// because otherwise there will be upgrade issues. +func (sv *SysVar) ValidateWithRelaxedValidation(vars *SessionVars, value string, scope ScopeFlag) string { + normalizedValue, err := sv.validateFromType(vars, value, scope) + if err != nil { + return normalizedValue + } + if sv.Validation != nil { + normalizedValue, err = sv.Validation(vars, normalizedValue, value, scope) + if err != nil { + return normalizedValue + } + } + return normalizedValue +} + const ( localDayTimeFormat = "15:04" // FullDayTimeFormat is the full format of analyze start time and end time. @@ -485,11 +509,15 @@ func SetSysVar(name string, value string) { sysVars[name].Value = value } -// GetSysVars returns the sysVars list under a RWLock +// GetSysVars deep copies the sysVars list under a RWLock func GetSysVars() map[string]*SysVar { sysVarsLock.RLock() defer sysVarsLock.RUnlock() - return sysVars + copy := make(map[string]*SysVar, len(sysVars)) + for name, sv := range sysVars { + copy[name] = sv + } + return copy } // PluginVarNames is global plugin var names set.