Skip to content

Commit

Permalink
Add more failure tests when tso service loading initial keyspace grou…
Browse files Browse the repository at this point in the history
…ps assignment (#6280)

ref #6232

Add more failure tests when tso service loading initial keyspace groups assignment

Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing authored Apr 7, 2023
1 parent 69ec7fc commit 6bac6b7
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 24 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# AUTOGENERATED BY github.com/pingcap/errors/errdoc-gen
# YOU CAN CHANGE THE 'description'/'workaround' FIELDS IF THEM ARE IMPROPER.

["ErrLoadKeyspaceGroupsRetryExhaustd"]
error = '''
load keyspace groups retry exhausted, %s
'''

["ErrLoadKeyspaceGroupsTerminated"]
error = '''
load keyspace groups terminated
Expand Down
25 changes: 13 additions & 12 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,19 @@ var (

// tso errors
var (
ErrSetLocalTSOConfig = errors.Normalize("set local tso config failed, %s", errors.RFCCodeText("PD:tso:ErrSetLocalTSOConfig"))
ErrGetAllocator = errors.Normalize("get allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocator"))
ErrGetLocalAllocator = errors.Normalize("get local allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetLocalAllocator"))
ErrSyncMaxTS = errors.Normalize("sync max ts failed, %s", errors.RFCCodeText("PD:tso:ErrSyncMaxTS"))
ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp"))
ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp"))
ErrLogicOverflow = errors.Normalize("logic part overflow", errors.RFCCodeText("PD:tso:ErrLogicOverflow"))
ErrProxyTSOTimeout = errors.Normalize("proxy tso timeout", errors.RFCCodeText("PD:tso:ErrProxyTSOTimeout"))
ErrKeyspaceGroupIDInvalid = errors.Normalize("the keyspace group id is invalid, %s", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupIDInvalid"))
ErrGetAllocatorManager = errors.Normalize("get allocator manager failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocatorManager"))
ErrLoadKeyspaceGroupsTimeout = errors.Normalize("load keyspace groups timeout", errors.RFCCodeText("ErrLoadKeyspaceGroupsTimeout"))
ErrLoadKeyspaceGroupsTerminated = errors.Normalize("load keyspace groups terminated", errors.RFCCodeText("ErrLoadKeyspaceGroupsTerminated"))
ErrSetLocalTSOConfig = errors.Normalize("set local tso config failed, %s", errors.RFCCodeText("PD:tso:ErrSetLocalTSOConfig"))
ErrGetAllocator = errors.Normalize("get allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocator"))
ErrGetLocalAllocator = errors.Normalize("get local allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetLocalAllocator"))
ErrSyncMaxTS = errors.Normalize("sync max ts failed, %s", errors.RFCCodeText("PD:tso:ErrSyncMaxTS"))
ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp"))
ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp"))
ErrLogicOverflow = errors.Normalize("logic part overflow", errors.RFCCodeText("PD:tso:ErrLogicOverflow"))
ErrProxyTSOTimeout = errors.Normalize("proxy tso timeout", errors.RFCCodeText("PD:tso:ErrProxyTSOTimeout"))
ErrKeyspaceGroupIDInvalid = errors.Normalize("the keyspace group id is invalid, %s", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupIDInvalid"))
ErrGetAllocatorManager = errors.Normalize("get allocator manager failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocatorManager"))
ErrLoadKeyspaceGroupsTimeout = errors.Normalize("load keyspace groups timeout", errors.RFCCodeText("ErrLoadKeyspaceGroupsTimeout"))
ErrLoadKeyspaceGroupsTerminated = errors.Normalize("load keyspace groups terminated", errors.RFCCodeText("ErrLoadKeyspaceGroupsTerminated"))
ErrLoadKeyspaceGroupsRetryExhaustd = errors.Normalize("load keyspace groups retry exhausted, %s", errors.RFCCodeText("ErrLoadKeyspaceGroupsRetryExhaustd"))
)

// member errors
Expand Down
36 changes: 31 additions & 5 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ package tso
import (
"context"
"encoding/json"
"errors"
"fmt"
"path"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
Expand All @@ -46,8 +48,8 @@ const (
// keyspace group assignment
defaultLoadKeyspaceGroupsTimeout = 30 * time.Second
defaultLoadKeyspaceGroupsBatchSize = int64(400)
loadFromEtcdMaxRetryTimes = 6
loadFromEtcdRetryInterval = 500 * time.Millisecond
defaultLoadFromEtcdRetryInterval = 500 * time.Millisecond
defaultLoadFromEtcdMaxRetryTimes = int(defaultLoadKeyspaceGroupsTimeout / defaultLoadFromEtcdRetryInterval)
watchKEtcdChangeRetryInterval = 1 * time.Second
)

Expand Down Expand Up @@ -109,6 +111,7 @@ type KeyspaceGroupManager struct {
// loadKeyspaceGroupsTimeout is the timeout for loading the initial keyspace group assignment.
loadKeyspaceGroupsTimeout time.Duration
loadKeyspaceGroupsBatchSize int64
loadFromEtcdMaxRetryTimes int
}

// NewKeyspaceGroupManager creates a new Keyspace Group Manager.
Expand Down Expand Up @@ -139,6 +142,7 @@ func NewKeyspaceGroupManager(
cfg: cfg,
loadKeyspaceGroupsTimeout: defaultLoadKeyspaceGroupsTimeout,
loadKeyspaceGroupsBatchSize: defaultLoadKeyspaceGroupsBatchSize,
loadFromEtcdMaxRetryTimes: defaultLoadFromEtcdMaxRetryTimes,
}

kgm.legacySvcStorage = endpoint.NewStorageEndpoint(
Expand Down Expand Up @@ -286,19 +290,41 @@ func (kgm *KeyspaceGroupManager) loadKeyspaceGroups(
[]string{rootPath, clientv3.GetPrefixRangeEnd(endpoint.KeyspaceGroupIDPrefix())}, "/")
opOption := []clientv3.OpOption{clientv3.WithRange(endKey), clientv3.WithLimit(limit)}

var resp *clientv3.GetResponse
for i := 0; i < loadFromEtcdMaxRetryTimes; i++ {
var (
i int
resp *clientv3.GetResponse
)
for ; i < kgm.loadFromEtcdMaxRetryTimes; i++ {
resp, err = etcdutil.EtcdKVGet(kgm.etcdClient, startKey, opOption...)

failpoint.Inject("delayLoadKeyspaceGroups", func(val failpoint.Value) {
if sleepIntervalSeconds, ok := val.(int); ok && sleepIntervalSeconds > 0 {
time.Sleep(time.Duration(sleepIntervalSeconds) * time.Second)
}
})

failpoint.Inject("loadKeyspaceGroupsTemporaryFail", func(val failpoint.Value) {
if maxFailTimes, ok := val.(int); ok && i < maxFailTimes {
err = errors.New("fail to read from etcd")
failpoint.Continue()
}
})

if err == nil && resp != nil {
break
}

select {
case <-ctx.Done():
return 0, []*endpoint.KeyspaceGroup{}, false, errs.ErrLoadKeyspaceGroupsTerminated
case <-time.After(loadFromEtcdRetryInterval):
case <-time.After(defaultLoadFromEtcdRetryInterval):
}
}

if i == kgm.loadFromEtcdMaxRetryTimes {
return 0, []*endpoint.KeyspaceGroup{}, false, errs.ErrLoadKeyspaceGroupsRetryExhaustd.FastGenByArgs(err)
}

kgs := make([]*endpoint.KeyspaceGroup, 0, len(resp.Kvs))
for _, item := range resp.Kvs {
kg := &endpoint.KeyspaceGroup{}
Expand Down
82 changes: 75 additions & 7 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ import (
"time"

"github.com/google/uuid"
"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
Expand Down Expand Up @@ -168,13 +170,80 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadWithDifferentBatchSize() {
}
}

// TestLoadKeyspaceGroupsTimeout tests there is timeout when loading the initial keyspace group assignment
// from etcd. The initialization of the keyspace group manager should fail.
func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsTimeout() {
re := suite.Require()

mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 1)
re.NotNil(mgr)
defer mgr.Close()

addKeyspaceGroupAssignment(
suite.ctx, suite.etcdClient, true,
mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0))

// Set the timeout to 1 second and inject the delayLoadKeyspaceGroups to return 3 seconds to let
// the loading sleep 3 seconds.
mgr.loadKeyspaceGroupsTimeout = time.Second
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/delayLoadKeyspaceGroups", "return(3)"))
err := mgr.Initialize(true)
// If loading keyspace groups timeout, the initialization should fail with ErrLoadKeyspaceGroupsTerminated.
re.Equal(errs.ErrLoadKeyspaceGroupsTerminated, err)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/delayLoadKeyspaceGroups"))
}

// TestLoadKeyspaceGroupsSucceedWithTempFailures tests the initialization should succeed when there are temporary
// failures during loading the initial keyspace group assignment from etcd.
func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsSucceedWithTempFailures() {
re := suite.Require()

mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 1)
re.NotNil(mgr)
defer mgr.Close()

addKeyspaceGroupAssignment(
suite.ctx, suite.etcdClient, true,
mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0))

// Set the max retry times to 3 and inject the loadKeyspaceGroupsTemporaryFail to return 2 to let
// loading from etcd fail 2 times but the whole initialization still succeeds.
mgr.loadFromEtcdMaxRetryTimes = 3
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/loadKeyspaceGroupsTemporaryFail", "return(2)"))
err := mgr.Initialize(true)
re.NoError(err)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/loadKeyspaceGroupsTemporaryFail"))
}

// TestLoadKeyspaceGroupsFailed tests the initialization should fail when there are too many failures
// during loading the initial keyspace group assignment from etcd.
func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsFailed() {
re := suite.Require()

mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 1)
re.NotNil(mgr)
defer mgr.Close()

addKeyspaceGroupAssignment(
suite.ctx, suite.etcdClient, true,
mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0))

// Set the max retry times to 3 and inject the loadKeyspaceGroupsTemporaryFail to return 3 to let
// loading from etcd fail 3 times which should cause the whole initialization to fail.
mgr.loadFromEtcdMaxRetryTimes = 3
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/loadKeyspaceGroupsTemporaryFail", "return(3)"))
err := mgr.Initialize(true)
re.Error(err)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/loadKeyspaceGroupsTemporaryFail"))
}

// TestWatchAndDynamicallyApplyChanges tests the keyspace group manager watch and dynamically apply
// keyspace groups' membership/distribution meta changes.
func (suite *keyspaceGroupManagerTestSuite) TestWatchAndDynamicallyApplyChanges() {
re := suite.Require()

// Start with the empty keyspace group assignment.
mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 0, 0)
mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 0)
re.NotNil(mgr)
defer mgr.Close()
err := mgr.Initialize(true)
Expand Down Expand Up @@ -274,7 +343,7 @@ func runTestLoadKeyspaceGroupsAssignment(
probabilityAssignToMe int, // percentage of assigning keyspace groups to this host/pod
) {
idsExpected := []int{}
mgr := newUniqueKeyspaceGroupManager(ctx, etcdClient, cfg, 0, loadKeyspaceGroupsBatchSize)
mgr := newUniqueKeyspaceGroupManager(ctx, etcdClient, cfg, loadKeyspaceGroupsBatchSize)
re.NotNil(mgr)
defer mgr.Close()

Expand Down Expand Up @@ -321,8 +390,9 @@ func runTestLoadKeyspaceGroupsAssignment(
}

func newUniqueKeyspaceGroupManager(
ctx context.Context, etcdClient *clientv3.Client, cfg *TestServiceConfig,
loadKeyspaceGroupsTimeout time.Duration, // set to 0 to use the default value
ctx context.Context,
etcdClient *clientv3.Client,
cfg *TestServiceConfig,
loadKeyspaceGroupsBatchSize int64, // set to 0 to use the default value
) *KeyspaceGroupManager {
tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: cfg.AdvertiseListenAddr}
Expand All @@ -334,9 +404,7 @@ func newUniqueKeyspaceGroupManager(

keyspaceGroupManager := NewKeyspaceGroupManager(
ctx, tsoServiceID, etcdClient, electionNamePrefix, legacySvcRootPath, tsoSvcRootPath, cfg)
if loadKeyspaceGroupsTimeout != 0 {
keyspaceGroupManager.loadKeyspaceGroupsTimeout = loadKeyspaceGroupsTimeout
}

if loadKeyspaceGroupsBatchSize != 0 {
keyspaceGroupManager.loadKeyspaceGroupsBatchSize = loadKeyspaceGroupsBatchSize
}
Expand Down

0 comments on commit 6bac6b7

Please sign in to comment.