From c9d8a744202482fdb0d6f6903b35e066bf43afcb Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Thu, 27 Jul 2023 15:18:07 +0800 Subject: [PATCH] executor: global kill 32bits (server ID part) (#44460) ref pingcap/tidb#8854 --- config/config.go | 9 +- domain/domain.go | 105 +++++++++++++++++------ executor/simple_test.go | 2 +- tests/globalkilltest/Dockerfile | 4 +- tests/globalkilltest/config-64.toml | 16 ++++ tests/globalkilltest/global_kill_test.go | 100 +++++++++++++++------ util/globalconn/globalconn.go | 40 ++++++--- util/globalconn/globalconn_test.go | 2 +- util/globalconn/pool.go | 12 +++ util/globalconn/pool_test.go | 7 ++ 10 files changed, 227 insertions(+), 70 deletions(-) create mode 100644 tests/globalkilltest/config-64.toml diff --git a/config/config.go b/config/config.go index 9c79bb96ae478..0417287b7067c 100644 --- a/config/config.go +++ b/config/config.go @@ -265,10 +265,12 @@ type Config struct { // one quarter of the total physical memory in the current system. MaxBallastObjectSize int `toml:"max-ballast-object-size" json:"max-ballast-object-size"` // BallastObjectSize set the initial size of the ballast object, the unit is byte. - BallastObjectSize int `toml:"ballast-object-size" json:"ballast-object-size"` + BallastObjectSize int `toml:"ballast-object-size" json:"ballast-object-size"` + TrxSummary TrxSummary `toml:"transaction-summary" json:"transaction-summary"` // EnableGlobalKill indicates whether to enable global kill. - TrxSummary TrxSummary `toml:"transaction-summary" json:"transaction-summary"` - EnableGlobalKill bool `toml:"enable-global-kill" json:"enable-global-kill"` + EnableGlobalKill bool `toml:"enable-global-kill" json:"enable-global-kill"` + // Enable32BitsConnectionID indicates whether to enable 32bits connection ID for global kill. + Enable32BitsConnectionID bool `toml:"enable-32bits-connection-id" json:"enable-32bits-connection-id"` // InitializeSQLFile is a file that will be executed after first bootstrap only. // It can be used to set GLOBAL system variable values InitializeSQLFile string `toml:"initialize-sql-file" json:"initialize-sql-file"` @@ -1054,6 +1056,7 @@ var defaultConf = Config{ EnableForwarding: defTiKVCfg.EnableForwarding, NewCollationsEnabledOnFirstBootstrap: true, EnableGlobalKill: true, + Enable32BitsConnectionID: true, TrxSummary: DefaultTrxSummary(), DisaggregatedTiFlash: false, TiFlashComputeAutoScalerType: tiflashcompute.DefASStr, diff --git a/domain/domain.go b/domain/domain.go index afd2324a14149..b170dc0b063af 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1160,8 +1160,28 @@ func (do *Domain) Init( do.ddl = checker } + // step 1: prepare the info/schema syncer which domain reload needed. + pdCli := do.GetPDClient() + skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard + do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, + do.etcdClient, do.unprefixedEtcdCli, pdCli, do.Store().GetCodec(), + skipRegisterToDashboard) + if err != nil { + return err + } + err = do.initResourceGroupsController(ctx, pdCli) + if err != nil { + return err + } + do.globalCfgSyncer = globalconfigsync.NewGlobalConfigSyncer(pdCli) + err = do.ddl.SchemaSyncer().Init(ctx) + if err != nil { + return err + } + + // step 2: initialize the global kill, which depends on `globalInfoSyncer`.` if config.GetGlobalConfig().EnableGlobalKill { - do.connIDAllocator = globalconn.NewGlobalAllocator(do.ServerID) + do.connIDAllocator = globalconn.NewGlobalAllocator(do.ServerID, config.GetGlobalConfig().Enable32BitsConnectionID) if do.etcdClient != nil { err := do.acquireServerID(ctx) @@ -1169,6 +1189,9 @@ func (do *Domain) Init( logutil.BgLogger().Error("acquire serverID failed", zap.Error(err)) do.isLostConnectionToPD.Store(1) // will retry in `do.serverIDKeeper` } else { + if err := do.info.StoreServerInfo(context.Background()); err != nil { + return errors.Trace(err) + } do.isLostConnectionToPD.Store(0) } @@ -1182,32 +1205,14 @@ func (do *Domain) Init( do.connIDAllocator = globalconn.NewSimpleAllocator() } - // step 1: prepare the info/schema syncer which domain reload needed. - pdCli := do.GetPDClient() - skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard - do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, - do.etcdClient, do.unprefixedEtcdCli, pdCli, do.Store().GetCodec(), - skipRegisterToDashboard) - if err != nil { - return err - } - err = do.initResourceGroupsController(ctx, pdCli) - if err != nil { - return err - } - do.globalCfgSyncer = globalconfigsync.NewGlobalConfigSyncer(pdCli) - err = do.ddl.SchemaSyncer().Init(ctx) - if err != nil { - return err - } startReloadTime := time.Now() - // step 2: domain reload the infoSchema. + // step 3: domain reload the infoSchema. err = do.Reload() if err != nil { return err } - // step 3: start the ddl after the domain reload, avoiding some internal sql running before infoSchema construction. + // step 4: start the ddl after the domain reload, avoiding some internal sql running before infoSchema construction. err = do.ddl.Start(sysCtxPool) if err != nil { return err @@ -2867,6 +2872,8 @@ const ( acquireServerIDTimeout = 10 * time.Second retrieveServerIDSessionTimeout = 10 * time.Second + acquire32BitsServerIDRetryCnt = 3 + // reservedConnXXX must be within [0, globalconn.ReservedCount) reservedConnAnalyze = 0 ) @@ -2961,10 +2968,20 @@ func (do *Domain) acquireServerID(ctx context.Context) error { return err } + conflictCnt := 0 for { - // get a random serverID: [1, MaxServerID64] - randServerID := rand.Int63n(int64(globalconn.MaxServerID64)) + 1 // #nosec G404 - key := fmt.Sprintf("%s/%v", serverIDEtcdPath, randServerID) + var proposeServerID uint64 + if config.GetGlobalConfig().Enable32BitsConnectionID { + proposeServerID, err = do.proposeServerID(ctx, conflictCnt) + if err != nil { + return errors.Trace(err) + } + } else { + // get a random serverID: [1, MaxServerID64] + proposeServerID = uint64(rand.Int63n(int64(globalconn.MaxServerID64)) + 1) // #nosec G404 + } + + key := fmt.Sprintf("%s/%v", serverIDEtcdPath, proposeServerID) cmp := clientv3.Compare(clientv3.CreateRevision(key), "=", 0) value := "0" @@ -2977,18 +2994,54 @@ func (do *Domain) acquireServerID(ctx context.Context) error { return err } if !resp.Succeeded { - logutil.BgLogger().Info("proposed random serverID exists, will randomize again", zap.Int64("randServerID", randServerID)) + logutil.BgLogger().Info("propose serverID exists, try again", zap.Uint64("proposeServerID", proposeServerID)) time.Sleep(acquireServerIDRetryInterval) + conflictCnt++ continue } - atomic.StoreUint64(&do.serverID, uint64(randServerID)) + atomic.StoreUint64(&do.serverID, proposeServerID) logutil.BgLogger().Info("acquireServerID", zap.Uint64("serverID", do.ServerID()), zap.String("lease id", strconv.FormatInt(int64(session.Lease()), 16))) return nil } } +// propose server ID by random. +func (do *Domain) proposeServerID(ctx context.Context, conflictCnt int) (uint64, error) { + // get a random server ID in range [min, max] + randomServerID := func(min uint64, max uint64) uint64 { + return uint64(rand.Int63n(int64(max-min+1)) + int64(min)) // #nosec G404 + } + + if conflictCnt < acquire32BitsServerIDRetryCnt { + // get existing server IDs. + allServerInfo, err := infosync.GetAllServerInfo(ctx) + if err != nil { + return 0, errors.Trace(err) + } + if float32(len(allServerInfo)) < 0.9*globalconn.MaxServerID32 { + serverIDs := make(map[uint64]struct{}, len(allServerInfo)) + for _, info := range allServerInfo { + serverID := info.ServerIDGetter() + if serverID <= globalconn.MaxServerID32 { + serverIDs[serverID] = struct{}{} + } + } + + for retry := 0; retry < 15; retry++ { + randServerID := randomServerID(1, globalconn.MaxServerID32) + if _, ok := serverIDs[randServerID]; !ok { + return randServerID, nil + } + } + } + } + + // upgrade to 64 bits. + return randomServerID(globalconn.MaxServerID32+1, globalconn.MaxServerID64), nil +} + func (do *Domain) refreshServerIDTTL(ctx context.Context) error { session, err := do.retrieveServerIDSession(ctx) if err != nil { diff --git a/executor/simple_test.go b/executor/simple_test.go index 873c299659275..5e26f65a474df 100644 --- a/executor/simple_test.go +++ b/executor/simple_test.go @@ -74,7 +74,7 @@ func TestKillStmt(t *testing.T) { result.Check(testkit.Rows("Warning 1105 Parse ConnectionID failed: unexpected connectionID exceeds int64")) // local kill - connIDAllocator := globalconn.NewGlobalAllocator(dom.ServerID) + connIDAllocator := globalconn.NewGlobalAllocator(dom.ServerID, false) killConnID := connIDAllocator.NextID() tk.MustExec("kill " + strconv.FormatUint(killConnID, 10)) result = tk.MustQuery("show warnings") diff --git a/tests/globalkilltest/Dockerfile b/tests/globalkilltest/Dockerfile index 3065fddeb7772..7730586e4070d 100644 --- a/tests/globalkilltest/Dockerfile +++ b/tests/globalkilltest/Dockerfile @@ -34,4 +34,6 @@ COPY go.mod . COPY go.sum . COPY parser/go.mod parser/go.mod COPY parser/go.sum parser/go.sum -RUN GO111MODULE=on go mod download + +ARG GOPROXY +RUN GO111MODULE=on GOPROXY=${GOPROXY} go mod download diff --git a/tests/globalkilltest/config-64.toml b/tests/globalkilltest/config-64.toml new file mode 100644 index 0000000000000..504cd4d3a4baf --- /dev/null +++ b/tests/globalkilltest/config-64.toml @@ -0,0 +1,16 @@ +# Copyright 2023 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +enable-global-kill=true +enable-32bits-connection-id=false diff --git a/tests/globalkilltest/global_kill_test.go b/tests/globalkilltest/global_kill_test.go index 7c9836fe6f269..263b34c3254b3 100644 --- a/tests/globalkilltest/global_kill_test.go +++ b/tests/globalkilltest/global_kill_test.go @@ -61,9 +61,11 @@ const ( msgErrConnectPD = "connect PD err: %v. Establish a cluster with PD & TiKV, and provide PD client path by `--pd=[,]" ) -// GlobakKillSuite is used for automated test of "Global Kill" feature. +// GlobalKillSuite is used for automated test of "Global Kill" feature. // See https://github.com/pingcap/tidb/blob/master/docs/design/2020-06-01-global-kill.md. type GlobalKillSuite struct { + enable32Bits bool + pdCli *clientv3.Client pdErr error @@ -72,8 +74,10 @@ type GlobalKillSuite struct { tikvProc *exec.Cmd } -func createGloabalKillSuite(t *testing.T) *GlobalKillSuite { +func createGlobalKillSuite(t *testing.T, enable32bits bool) *GlobalKillSuite { s := new(GlobalKillSuite) + s.enable32Bits = enable32bits + err := logutil.InitLogger(&logutil.LogConfig{Config: log.Config{Level: *logLevel}}) require.NoError(t, err) @@ -205,6 +209,13 @@ func (s *GlobalKillSuite) cleanCluster() (err error) { return nil } +func (s *GlobalKillSuite) getTiDBConfigPath() string { + if s.enable32Bits { + return "./config.toml" + } + return "./config-64.toml" +} + func (s *GlobalKillSuite) startTiDBWithoutPD(port int, statusPort int) (cmd *exec.Cmd, err error) { cmd = exec.Command(*tidbBinaryPath, "--store=mocktikv", @@ -214,7 +225,7 @@ func (s *GlobalKillSuite) startTiDBWithoutPD(port int, statusPort int) (cmd *exe fmt.Sprintf("--status=%d", statusPort), fmt.Sprintf("--log-file=%s/tidb%d.log", *tmpPath, port), fmt.Sprintf("--log-slow-query=%s/tidb-slow%d.log", *tmpPath, port), - fmt.Sprintf("--config=%s", "./config.toml")) + fmt.Sprintf("--config=%s", s.getTiDBConfigPath())) log.Info("starting tidb", zap.Any("cmd", cmd)) err = cmd.Start() if err != nil { @@ -233,7 +244,7 @@ func (s *GlobalKillSuite) startTiDBWithPD(port int, statusPort int, pdPath strin fmt.Sprintf("--status=%d", statusPort), fmt.Sprintf("--log-file=%s/tidb%d.log", *tmpPath, port), fmt.Sprintf("--log-slow-query=%s/tidb-slow%d.log", *tmpPath, port), - fmt.Sprintf("--config=%s", "./config.toml")) + fmt.Sprintf("--config=%s", s.getTiDBConfigPath())) log.Info("starting tidb", zap.Any("cmd", cmd)) err = cmd.Start() if err != nil { @@ -282,9 +293,10 @@ func (s *GlobalKillSuite) connectTiDB(port int) (db *sql.DB, err error) { addr := fmt.Sprintf("127.0.0.1:%d", port) dsn := fmt.Sprintf("root@(%s)/test", addr) sleepTime := 250 * time.Millisecond + sleepTimeLimit := 1 * time.Second + maxRetryDuration := 20 * time.Second startTime := time.Now() - maxRetry := 10 - for i := 0; i < maxRetry; i++ { + for i := 0; time.Since(startTime) < maxRetryDuration; i++ { db, err = sql.Open("mysql", dsn) if err != nil { log.Warn("open addr failed", @@ -303,16 +315,12 @@ func (s *GlobalKillSuite) connectTiDB(port int) (db *sql.DB, err error) { zap.Int("retry count", i), zap.Error(err), ) - if i == maxRetry-1 { - return - } - err = db.Close() - if err != nil { - return nil, errors.Trace(err) - } + db.Close() time.Sleep(sleepTime) - sleepTime += sleepTime + if sleepTime < sleepTimeLimit { + sleepTime += sleepTime + } } if err != nil { log.Error("connect to server addr failed", @@ -333,7 +341,7 @@ type sleepResult struct { err error } -func (s *GlobalKillSuite) killByCtrlC(t *testing.T, port int, sleepTime int) time.Duration { +func (s *GlobalKillSuite) testKillByCtrlC(t *testing.T, port int, sleepTime int) time.Duration { cli := exec.Command("mysql", "-h127.0.0.1", fmt.Sprintf("-P%d", port), @@ -361,6 +369,11 @@ func (s *GlobalKillSuite) killByCtrlC(t *testing.T, port int, sleepTime int) tim r := <-ch require.NoError(t, err) + if s.enable32Bits { + require.Less(t, r.elapsed, time.Duration(sleepTime)*time.Second) + } else { + require.GreaterOrEqual(t, r.elapsed, time.Duration(sleepTime)*time.Second) + } return r.elapsed } @@ -432,7 +445,15 @@ func (s *GlobalKillSuite) killByKillStatement(t *testing.T, db1 *sql.DB, db2 *sq // [Test Scenario 1] A TiDB without PD, killed by Ctrl+C, and killed by KILL. func TestWithoutPD(t *testing.T) { - s := createGloabalKillSuite(t) + doTestWithoutPD(t, false) +} + +func TestWithoutPD32(t *testing.T) { + doTestWithoutPD(t, true) +} + +func doTestWithoutPD(t *testing.T, enable32Bits bool) { + s := createGlobalKillSuite(t, enable32Bits) var err error port := *tidbStartPort tidb, err := s.startTiDBWithoutPD(port, *tidbStatusPort) @@ -448,17 +469,24 @@ func TestWithoutPD(t *testing.T) { // Test mysql client CTRL-C // mysql client "CTRL-C" truncate connection id to 32bits, and is ignored by TiDB. - elapsed := s.killByCtrlC(t, port, 2) - require.GreaterOrEqual(t, elapsed, 2*time.Second) + s.testKillByCtrlC(t, port, 2) // Test KILL statement - elapsed = s.killByKillStatement(t, db, db, 2) + elapsed := s.killByKillStatement(t, db, db, 2) require.Less(t, elapsed, 2*time.Second) } // [Test Scenario 2] One TiDB with PD, killed by Ctrl+C, and killed by KILL. func TestOneTiDB(t *testing.T) { - s := createGloabalKillSuite(t) + doTestOneTiDB(t, false) +} + +func TestOneTiDB32(t *testing.T) { + doTestOneTiDB(t, true) +} + +func doTestOneTiDB(t *testing.T, enable32Bits bool) { + s := createGlobalKillSuite(t, enable32Bits) port := *tidbStartPort + 1 tidb, err := s.startTiDBWithPD(port, *tidbStatusPort+1, *pdClientPath) require.NoError(t, err) @@ -476,17 +504,24 @@ func TestOneTiDB(t *testing.T) { // Test mysql client CTRL-C // mysql client "CTRL-C" truncate connection id to 32bits, and is ignored by TiDB. // see TiDB's logging for the truncation warning. - elapsed := s.killByCtrlC(t, port, sleepTime) - require.GreaterOrEqual(t, elapsed, sleepTime*time.Second) + s.testKillByCtrlC(t, port, sleepTime) // Test KILL statement - elapsed = s.killByKillStatement(t, db, db, sleepTime) + elapsed := s.killByKillStatement(t, db, db, sleepTime) require.Less(t, elapsed, sleepTime*time.Second) } // [Test Scenario 3] Multiple TiDB nodes, killed {local,remote} by {Ctrl-C,KILL}. func TestMultipleTiDB(t *testing.T) { - s := createGloabalKillSuite(t) + doTestMultipleTiDB(t, false) +} + +func TestMultipleTiDB32(t *testing.T) { + doTestMultipleTiDB(t, true) +} + +func doTestMultipleTiDB(t *testing.T, enable32Bits bool) { + s := createGlobalKillSuite(t, enable32Bits) require.NoErrorf(t, s.pdErr, msgErrConnectPD, s.pdErr) // tidb1 & conn1a,conn1b @@ -519,8 +554,7 @@ func TestMultipleTiDB(t *testing.T) { // kill local by CTRL-C // mysql client "CTRL-C" truncate connection id to 32bits, and is ignored by TiDB. // see TiDB's logging for the truncation warning. - elapsed = s.killByCtrlC(t, port1, sleepTime) - require.GreaterOrEqual(t, elapsed, sleepTime*time.Second) + s.testKillByCtrlC(t, port1, sleepTime) // kill local by KILL elapsed = s.killByKillStatement(t, db1a, db1b, sleepTime) @@ -532,7 +566,15 @@ func TestMultipleTiDB(t *testing.T) { } func TestLostConnection(t *testing.T) { - s := createGloabalKillSuite(t) + doTestLostConnection(t, false) +} + +func TestLostConnection32(t *testing.T) { + doTestLostConnection(t, true) +} + +func doTestLostConnection(t *testing.T, enable32Bits bool) { + s := createGlobalKillSuite(t, enable32Bits) require.NoErrorf(t, s.pdErr, msgErrConnectPD, s.pdErr) // tidb1 @@ -572,7 +614,7 @@ func TestLostConnection(t *testing.T) { // disconnect to PD by shutting down PD process. log.Info("shutdown PD to simulate lost connection to PD.") err = s.stopPD() - log.Info(fmt.Sprintf("pd shutdown: %s", err)) + log.Info(fmt.Sprintf("pd shutdown: %v", err)) require.NoError(t, err) // wait for "lostConnectionToPDTimeout" elapsed. @@ -633,3 +675,5 @@ func TestLostConnection(t *testing.T) { require.Less(t, elapsed, 2*time.Second) } } + +// TODO: test for upgrade 32 -> 64 & downgrade 64 -> 32 diff --git a/util/globalconn/globalconn.go b/util/globalconn/globalconn.go index 4cb46f81eedfc..40e1a6cf1ce53 100644 --- a/util/globalconn/globalconn.go +++ b/util/globalconn/globalconn.go @@ -39,9 +39,13 @@ import ( // // 63 62 41 40 1 0 // +--+---------------------+--------------------------------------+------+ -// | | serverId | local connId |markup| +// | | serverID | local connID |markup| // |=0| (22b) | (40b) | =1 | // +--+---------------------+--------------------------------------+------+ +// +// NOTE: +// 1. `serverId“ in 64 bits version can be less than 2^11. This will happen when the 32 bits local connID has been used up, while `serverID` stay unchanged. +// 2. The local connID of a 32 bits GCID can be the same with another 64 bits GCID. This will not violate the uniqueness of GCID. type GCID struct { ServerID uint64 LocalConnID uint64 @@ -198,28 +202,40 @@ type GlobalAllocator struct { local64 AutoIncPool } -// Is64 indicates allocate 64bits global connection ID or not. -func (g *GlobalAllocator) Is64() bool { +// is64 indicates allocate 64bits global connection ID or not. +func (g *GlobalAllocator) is64() bool { return g.is64bits.Get() != 0 } -// UpgradeTo64 upgrade allocator to 64bits. -func (g *GlobalAllocator) UpgradeTo64() { +// upgradeTo64 upgrade allocator to 64bits. +func (g *GlobalAllocator) upgradeTo64() { g.is64bits.Set(1) + logutil.BgLogger().Info("GlobalAllocator upgrade to 64 bits") +} + +func (g *GlobalAllocator) downgradeTo32() { + g.is64bits.Set(0) + logutil.BgLogger().Info("GlobalAllocator downgrade to 32 bits") } // LocalConnIDAllocator64TryCount is the try count of 64bits local connID allocation. const LocalConnIDAllocator64TryCount = 10 // NewGlobalAllocator creates a GlobalAllocator. -func NewGlobalAllocator(serverIDGetter serverIDGetterFn) *GlobalAllocator { +func NewGlobalAllocator(serverIDGetter serverIDGetterFn, enable32Bits bool) *GlobalAllocator { g := &GlobalAllocator{ serverIDGetter: serverIDGetter, } g.local32.InitExt(1<