Skip to content

Commit

Permalink
executor: global kill 32bits (server ID part) (#44460)
Browse files Browse the repository at this point in the history
ref #8854
  • Loading branch information
pingyu authored Jul 27, 2023
1 parent 5ae5abf commit c9d8a74
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 70 deletions.
9 changes: 6 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,12 @@ type Config struct {
// one quarter of the total physical memory in the current system.
MaxBallastObjectSize int `toml:"max-ballast-object-size" json:"max-ballast-object-size"`
// BallastObjectSize set the initial size of the ballast object, the unit is byte.
BallastObjectSize int `toml:"ballast-object-size" json:"ballast-object-size"`
BallastObjectSize int `toml:"ballast-object-size" json:"ballast-object-size"`
TrxSummary TrxSummary `toml:"transaction-summary" json:"transaction-summary"`
// EnableGlobalKill indicates whether to enable global kill.
TrxSummary TrxSummary `toml:"transaction-summary" json:"transaction-summary"`
EnableGlobalKill bool `toml:"enable-global-kill" json:"enable-global-kill"`
EnableGlobalKill bool `toml:"enable-global-kill" json:"enable-global-kill"`
// Enable32BitsConnectionID indicates whether to enable 32bits connection ID for global kill.
Enable32BitsConnectionID bool `toml:"enable-32bits-connection-id" json:"enable-32bits-connection-id"`
// InitializeSQLFile is a file that will be executed after first bootstrap only.
// It can be used to set GLOBAL system variable values
InitializeSQLFile string `toml:"initialize-sql-file" json:"initialize-sql-file"`
Expand Down Expand Up @@ -1054,6 +1056,7 @@ var defaultConf = Config{
EnableForwarding: defTiKVCfg.EnableForwarding,
NewCollationsEnabledOnFirstBootstrap: true,
EnableGlobalKill: true,
Enable32BitsConnectionID: true,
TrxSummary: DefaultTrxSummary(),
DisaggregatedTiFlash: false,
TiFlashComputeAutoScalerType: tiflashcompute.DefASStr,
Expand Down
105 changes: 79 additions & 26 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1160,15 +1160,38 @@ func (do *Domain) Init(
do.ddl = checker
}

// step 1: prepare the info/schema syncer which domain reload needed.
pdCli := do.GetPDClient()
skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard
do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID,
do.etcdClient, do.unprefixedEtcdCli, pdCli, do.Store().GetCodec(),
skipRegisterToDashboard)
if err != nil {
return err
}
err = do.initResourceGroupsController(ctx, pdCli)
if err != nil {
return err
}
do.globalCfgSyncer = globalconfigsync.NewGlobalConfigSyncer(pdCli)
err = do.ddl.SchemaSyncer().Init(ctx)
if err != nil {
return err
}

// step 2: initialize the global kill, which depends on `globalInfoSyncer`.`
if config.GetGlobalConfig().EnableGlobalKill {
do.connIDAllocator = globalconn.NewGlobalAllocator(do.ServerID)
do.connIDAllocator = globalconn.NewGlobalAllocator(do.ServerID, config.GetGlobalConfig().Enable32BitsConnectionID)

if do.etcdClient != nil {
err := do.acquireServerID(ctx)
if err != nil {
logutil.BgLogger().Error("acquire serverID failed", zap.Error(err))
do.isLostConnectionToPD.Store(1) // will retry in `do.serverIDKeeper`
} else {
if err := do.info.StoreServerInfo(context.Background()); err != nil {
return errors.Trace(err)
}
do.isLostConnectionToPD.Store(0)
}

Expand All @@ -1182,32 +1205,14 @@ func (do *Domain) Init(
do.connIDAllocator = globalconn.NewSimpleAllocator()
}

// step 1: prepare the info/schema syncer which domain reload needed.
pdCli := do.GetPDClient()
skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard
do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID,
do.etcdClient, do.unprefixedEtcdCli, pdCli, do.Store().GetCodec(),
skipRegisterToDashboard)
if err != nil {
return err
}
err = do.initResourceGroupsController(ctx, pdCli)
if err != nil {
return err
}
do.globalCfgSyncer = globalconfigsync.NewGlobalConfigSyncer(pdCli)
err = do.ddl.SchemaSyncer().Init(ctx)
if err != nil {
return err
}
startReloadTime := time.Now()
// step 2: domain reload the infoSchema.
// step 3: domain reload the infoSchema.
err = do.Reload()
if err != nil {
return err
}

// step 3: start the ddl after the domain reload, avoiding some internal sql running before infoSchema construction.
// step 4: start the ddl after the domain reload, avoiding some internal sql running before infoSchema construction.
err = do.ddl.Start(sysCtxPool)
if err != nil {
return err
Expand Down Expand Up @@ -2867,6 +2872,8 @@ const (
acquireServerIDTimeout = 10 * time.Second
retrieveServerIDSessionTimeout = 10 * time.Second

acquire32BitsServerIDRetryCnt = 3

// reservedConnXXX must be within [0, globalconn.ReservedCount)
reservedConnAnalyze = 0
)
Expand Down Expand Up @@ -2961,10 +2968,20 @@ func (do *Domain) acquireServerID(ctx context.Context) error {
return err
}

conflictCnt := 0
for {
// get a random serverID: [1, MaxServerID64]
randServerID := rand.Int63n(int64(globalconn.MaxServerID64)) + 1 // #nosec G404
key := fmt.Sprintf("%s/%v", serverIDEtcdPath, randServerID)
var proposeServerID uint64
if config.GetGlobalConfig().Enable32BitsConnectionID {
proposeServerID, err = do.proposeServerID(ctx, conflictCnt)
if err != nil {
return errors.Trace(err)
}
} else {
// get a random serverID: [1, MaxServerID64]
proposeServerID = uint64(rand.Int63n(int64(globalconn.MaxServerID64)) + 1) // #nosec G404
}

key := fmt.Sprintf("%s/%v", serverIDEtcdPath, proposeServerID)
cmp := clientv3.Compare(clientv3.CreateRevision(key), "=", 0)
value := "0"

Expand All @@ -2977,18 +2994,54 @@ func (do *Domain) acquireServerID(ctx context.Context) error {
return err
}
if !resp.Succeeded {
logutil.BgLogger().Info("proposed random serverID exists, will randomize again", zap.Int64("randServerID", randServerID))
logutil.BgLogger().Info("propose serverID exists, try again", zap.Uint64("proposeServerID", proposeServerID))
time.Sleep(acquireServerIDRetryInterval)
conflictCnt++
continue
}

atomic.StoreUint64(&do.serverID, uint64(randServerID))
atomic.StoreUint64(&do.serverID, proposeServerID)
logutil.BgLogger().Info("acquireServerID", zap.Uint64("serverID", do.ServerID()),
zap.String("lease id", strconv.FormatInt(int64(session.Lease()), 16)))
return nil
}
}

// propose server ID by random.
func (do *Domain) proposeServerID(ctx context.Context, conflictCnt int) (uint64, error) {
// get a random server ID in range [min, max]
randomServerID := func(min uint64, max uint64) uint64 {
return uint64(rand.Int63n(int64(max-min+1)) + int64(min)) // #nosec G404
}

if conflictCnt < acquire32BitsServerIDRetryCnt {
// get existing server IDs.
allServerInfo, err := infosync.GetAllServerInfo(ctx)
if err != nil {
return 0, errors.Trace(err)
}
if float32(len(allServerInfo)) < 0.9*globalconn.MaxServerID32 {
serverIDs := make(map[uint64]struct{}, len(allServerInfo))
for _, info := range allServerInfo {
serverID := info.ServerIDGetter()
if serverID <= globalconn.MaxServerID32 {
serverIDs[serverID] = struct{}{}
}
}

for retry := 0; retry < 15; retry++ {
randServerID := randomServerID(1, globalconn.MaxServerID32)
if _, ok := serverIDs[randServerID]; !ok {
return randServerID, nil
}
}
}
}

// upgrade to 64 bits.
return randomServerID(globalconn.MaxServerID32+1, globalconn.MaxServerID64), nil
}

func (do *Domain) refreshServerIDTTL(ctx context.Context) error {
session, err := do.retrieveServerIDSession(ctx)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion executor/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestKillStmt(t *testing.T) {
result.Check(testkit.Rows("Warning 1105 Parse ConnectionID failed: unexpected connectionID exceeds int64"))

// local kill
connIDAllocator := globalconn.NewGlobalAllocator(dom.ServerID)
connIDAllocator := globalconn.NewGlobalAllocator(dom.ServerID, false)
killConnID := connIDAllocator.NextID()
tk.MustExec("kill " + strconv.FormatUint(killConnID, 10))
result = tk.MustQuery("show warnings")
Expand Down
4 changes: 3 additions & 1 deletion tests/globalkilltest/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ COPY go.mod .
COPY go.sum .
COPY parser/go.mod parser/go.mod
COPY parser/go.sum parser/go.sum
RUN GO111MODULE=on go mod download

ARG GOPROXY
RUN GO111MODULE=on GOPROXY=${GOPROXY} go mod download
16 changes: 16 additions & 0 deletions tests/globalkilltest/config-64.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright 2023 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

enable-global-kill=true
enable-32bits-connection-id=false
Loading

0 comments on commit c9d8a74

Please sign in to comment.