Skip to content

Commit

Permalink
domain, session: Add new sysvarcache to replace global values cache (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
morgo committed May 17, 2021
1 parent d641810 commit 0f10bef
Show file tree
Hide file tree
Showing 16 changed files with 343 additions and 438 deletions.
3 changes: 0 additions & 3 deletions cmd/explaintest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/ast"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -663,8 +662,6 @@ func main() {
log.Fatal(fmt.Sprintf("%s failed", sql), zap.Error(err))
}
}
// Wait global variables to reload.
time.Sleep(domain.GlobalVariableCacheExpiry)

if _, err = mdb.Exec("set sql_mode='STRICT_TRANS_TABLES'"); err != nil {
log.Fatal("set sql_mode='STRICT_TRANS_TABLES' failed", zap.Error(err))
Expand Down
2 changes: 0 additions & 2 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,6 @@ func (s *testStateChangeSuite) TestParallelAddGeneratedColumnAndAlterModifyColum
_, err = s.se.Execute(context.Background(), "set global tidb_enable_change_column_type = 0")
c.Assert(err, IsNil)
}()
domain.GetDomain(s.se).GetGlobalVarsCache().Disable()

sql1 := "ALTER TABLE t ADD COLUMN f INT GENERATED ALWAYS AS(a+1);"
sql2 := "ALTER TABLE t MODIFY COLUMN a tinyint;"
Expand All @@ -1083,7 +1082,6 @@ func (s *testStateChangeSuite) TestParallelAlterModifyColumnAndAddPK(c *C) {
_, err = s.se.Execute(context.Background(), "set global tidb_enable_change_column_type = 0")
c.Assert(err, IsNil)
}()
domain.GetDomain(s.se).GetGlobalVarsCache().Disable()

sql1 := "ALTER TABLE t ADD PRIMARY KEY (b) NONCLUSTERED;"
sql2 := "ALTER TABLE t MODIFY COLUMN b tinyint;"
Expand Down
73 changes: 71 additions & 2 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type Domain struct {
sysSessionPool *sessionPool
exit chan struct{}
etcdClient *clientv3.Client
gvc GlobalVariableCache
sysVarCache SysVarCache // replaces GlobalVariableCache
slowQuery *topNSlowQueries
expensiveQueryHandle *expensivequery.Handle
wg sync.WaitGroup
Expand Down Expand Up @@ -900,6 +900,55 @@ func (do *Domain) LoadPrivilegeLoop(ctx sessionctx.Context) error {
return nil
}

// LoadSysVarCacheLoop create a goroutine loads sysvar cache in a loop,
// it should be called only once in BootstrapSession.
func (do *Domain) LoadSysVarCacheLoop(ctx sessionctx.Context) error {
err := do.sysVarCache.RebuildSysVarCache(ctx)
if err != nil {
return err
}
var watchCh clientv3.WatchChan
duration := 30 * time.Second
if do.etcdClient != nil {
watchCh = do.etcdClient.Watch(context.Background(), sysVarCacheKey)
}
do.wg.Add(1)
go func() {
defer func() {
do.wg.Done()
logutil.BgLogger().Info("LoadSysVarCacheLoop exited.")
util.Recover(metrics.LabelDomain, "LoadSysVarCacheLoop", nil, false)
}()
var count int
for {
ok := true
select {
case <-do.exit:
return
case _, ok = <-watchCh:
case <-time.After(duration):
}
if !ok {
logutil.BgLogger().Error("LoadSysVarCacheLoop loop watch channel closed")
watchCh = do.etcdClient.Watch(context.Background(), sysVarCacheKey)
count++
if count > 10 {
time.Sleep(time.Duration(count) * time.Second)
}
continue
}
count = 0
logutil.BgLogger().Debug("Rebuilding sysvar cache from etcd watch event.")
err := do.sysVarCache.RebuildSysVarCache(ctx)
metrics.LoadSysVarCacheCounter.WithLabelValues(metrics.RetLabel(err)).Inc()
if err != nil {
logutil.BgLogger().Error("LoadSysVarCacheLoop failed", zap.Error(err))
}
}
}()
return nil
}

// PrivilegeHandle returns the MySQLPrivilege.
func (do *Domain) PrivilegeHandle() *privileges.Handle {
return do.privHandle
Expand Down Expand Up @@ -1278,7 +1327,10 @@ func (do *Domain) ExpensiveQueryHandle() *expensivequery.Handle {
return do.expensiveQueryHandle
}

const privilegeKey = "/tidb/privilege"
const (
privilegeKey = "/tidb/privilege"
sysVarCacheKey = "/tidb/sysvars"
)

// NotifyUpdatePrivilege updates privilege key in etcd, TiDB client that watches
// the key will get notification.
Expand All @@ -1300,6 +1352,23 @@ func (do *Domain) NotifyUpdatePrivilege(ctx sessionctx.Context) {
}
}

// NotifyUpdateSysVarCache updates the sysvar cache key in etcd, which other TiDB
// clients are subscribed to for updates. For the caller, the cache is also built
// synchronously so that the effect is immediate.
func (do *Domain) NotifyUpdateSysVarCache(ctx sessionctx.Context) {
if do.etcdClient != nil {
row := do.etcdClient.KV
_, err := row.Put(context.Background(), sysVarCacheKey, "")
if err != nil {
logutil.BgLogger().Warn("notify update sysvar cache failed", zap.Error(err))
}
}
// update locally
if err := do.sysVarCache.RebuildSysVarCache(ctx); err != nil {
logutil.BgLogger().Error("rebuilding sysvar cache failed", zap.Error(err))
}
}

// ServerID gets serverID.
func (do *Domain) ServerID() uint64 {
return atomic.LoadUint64(&do.serverID)
Expand Down
135 changes: 0 additions & 135 deletions domain/global_vars_cache.go

This file was deleted.

Loading

0 comments on commit 0f10bef

Please sign in to comment.