From 6f3f264476c5dcad08904a6dfe244f566ef4b330 Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Thu, 6 Apr 2023 16:55:11 -0700 Subject: [PATCH 1/2] Add more failure tests when tso service loading initial keyspace groups assignment Signed-off-by: Bin Shi --- pkg/errs/errno.go | 25 +++++----- pkg/tso/keyspace_group_manager.go | 36 ++++++++++++-- pkg/tso/keyspace_group_manager_test.go | 69 ++++++++++++++++++++++++++ 3 files changed, 113 insertions(+), 17 deletions(-) diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 1deb285df94..d14275bb5e3 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -39,18 +39,19 @@ var ( // tso errors var ( - ErrSetLocalTSOConfig = errors.Normalize("set local tso config failed, %s", errors.RFCCodeText("PD:tso:ErrSetLocalTSOConfig")) - ErrGetAllocator = errors.Normalize("get allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocator")) - ErrGetLocalAllocator = errors.Normalize("get local allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetLocalAllocator")) - ErrSyncMaxTS = errors.Normalize("sync max ts failed, %s", errors.RFCCodeText("PD:tso:ErrSyncMaxTS")) - ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp")) - ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp")) - ErrLogicOverflow = errors.Normalize("logic part overflow", errors.RFCCodeText("PD:tso:ErrLogicOverflow")) - ErrProxyTSOTimeout = errors.Normalize("proxy tso timeout", errors.RFCCodeText("PD:tso:ErrProxyTSOTimeout")) - ErrKeyspaceGroupIDInvalid = errors.Normalize("the keyspace group id is invalid, %s", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupIDInvalid")) - ErrGetAllocatorManager = errors.Normalize("get allocator manager failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocatorManager")) - ErrLoadKeyspaceGroupsTimeout = errors.Normalize("load keyspace groups timeout", errors.RFCCodeText("ErrLoadKeyspaceGroupsTimeout")) - ErrLoadKeyspaceGroupsTerminated = errors.Normalize("load keyspace groups terminated", errors.RFCCodeText("ErrLoadKeyspaceGroupsTerminated")) + ErrSetLocalTSOConfig = errors.Normalize("set local tso config failed, %s", errors.RFCCodeText("PD:tso:ErrSetLocalTSOConfig")) + ErrGetAllocator = errors.Normalize("get allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocator")) + ErrGetLocalAllocator = errors.Normalize("get local allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetLocalAllocator")) + ErrSyncMaxTS = errors.Normalize("sync max ts failed, %s", errors.RFCCodeText("PD:tso:ErrSyncMaxTS")) + ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp")) + ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp")) + ErrLogicOverflow = errors.Normalize("logic part overflow", errors.RFCCodeText("PD:tso:ErrLogicOverflow")) + ErrProxyTSOTimeout = errors.Normalize("proxy tso timeout", errors.RFCCodeText("PD:tso:ErrProxyTSOTimeout")) + ErrKeyspaceGroupIDInvalid = errors.Normalize("the keyspace group id is invalid, %s", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupIDInvalid")) + ErrGetAllocatorManager = errors.Normalize("get allocator manager failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocatorManager")) + ErrLoadKeyspaceGroupsTimeout = errors.Normalize("load keyspace groups timeout", errors.RFCCodeText("ErrLoadKeyspaceGroupsTimeout")) + ErrLoadKeyspaceGroupsTerminated = errors.Normalize("load keyspace groups terminated", errors.RFCCodeText("ErrLoadKeyspaceGroupsTerminated")) + ErrLoadKeyspaceGroupsRetryExhaustd = errors.Normalize("load keyspace groups retry exhausted, %s", errors.RFCCodeText("ErrLoadKeyspaceGroupsRetryExhaustd")) ) // member errors diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index e2cfba24658..2f97299fede 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -17,6 +17,7 @@ package tso import ( "context" "encoding/json" + "errors" "fmt" "path" "strings" @@ -24,6 +25,7 @@ import ( "sync/atomic" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" @@ -46,8 +48,8 @@ const ( // keyspace group assignment defaultLoadKeyspaceGroupsTimeout = 30 * time.Second defaultLoadKeyspaceGroupsBatchSize = int64(400) - loadFromEtcdMaxRetryTimes = 6 - loadFromEtcdRetryInterval = 500 * time.Millisecond + defaultLoadFromEtcdRetryInterval = 500 * time.Millisecond + defaultLoadFromEtcdMaxRetryTimes = int(defaultLoadKeyspaceGroupsTimeout / defaultLoadFromEtcdRetryInterval) watchKEtcdChangeRetryInterval = 1 * time.Second ) @@ -109,6 +111,7 @@ type KeyspaceGroupManager struct { // loadKeyspaceGroupsTimeout is the timeout for loading the initial keyspace group assignment. loadKeyspaceGroupsTimeout time.Duration loadKeyspaceGroupsBatchSize int64 + loadFromEtcdMaxRetryTimes int } // NewKeyspaceGroupManager creates a new Keyspace Group Manager. @@ -139,6 +142,7 @@ func NewKeyspaceGroupManager( cfg: cfg, loadKeyspaceGroupsTimeout: defaultLoadKeyspaceGroupsTimeout, loadKeyspaceGroupsBatchSize: defaultLoadKeyspaceGroupsBatchSize, + loadFromEtcdMaxRetryTimes: defaultLoadFromEtcdMaxRetryTimes, } kgm.legacySvcStorage = endpoint.NewStorageEndpoint( @@ -286,19 +290,41 @@ func (kgm *KeyspaceGroupManager) loadKeyspaceGroups( []string{rootPath, clientv3.GetPrefixRangeEnd(endpoint.KeyspaceGroupIDPrefix())}, "/") opOption := []clientv3.OpOption{clientv3.WithRange(endKey), clientv3.WithLimit(limit)} - var resp *clientv3.GetResponse - for i := 0; i < loadFromEtcdMaxRetryTimes; i++ { + var ( + i int + resp *clientv3.GetResponse + ) + for ; i < kgm.loadFromEtcdMaxRetryTimes; i++ { resp, err = etcdutil.EtcdKVGet(kgm.etcdClient, startKey, opOption...) + + failpoint.Inject("delayLoadKeyspaceGroups", func(val failpoint.Value) { + if sleepIntervalSeconds, ok := val.(int); ok && sleepIntervalSeconds > 0 { + time.Sleep(time.Duration(sleepIntervalSeconds) * time.Second) + } + }) + + failpoint.Inject("loadKeyspaceGroupsTemporaryFail", func(val failpoint.Value) { + if maxFailTimes, ok := val.(int); ok && i < maxFailTimes { + err = errors.New("fail to read from etcd") + failpoint.Continue() + } + }) + if err == nil && resp != nil { break } + select { case <-ctx.Done(): return 0, []*endpoint.KeyspaceGroup{}, false, errs.ErrLoadKeyspaceGroupsTerminated - case <-time.After(loadFromEtcdRetryInterval): + case <-time.After(defaultLoadFromEtcdRetryInterval): } } + if i == kgm.loadFromEtcdMaxRetryTimes { + return 0, []*endpoint.KeyspaceGroup{}, false, errs.ErrLoadKeyspaceGroupsRetryExhaustd.FastGenByArgs(err) + } + kgs := make([]*endpoint.KeyspaceGroup, 0, len(resp.Kvs)) for _, item := range resp.Kvs { kg := &endpoint.KeyspaceGroup{} diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index bbdd1e2cfd3..ec81c3e0434 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -29,8 +29,10 @@ import ( "time" "github.com/google/uuid" + "github.com/pingcap/failpoint" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mcs/discovery" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/storage/endpoint" @@ -168,6 +170,73 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadWithDifferentBatchSize() { } } +// TestLoadKeyspaceGroupsTimeout tests there is timeout when loading the initial keyspace group assignment +// from etcd. The initialization of the keyspace group manager should fail. +func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsTimeout() { + re := suite.Require() + + mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 0, 1) + re.NotNil(mgr) + defer mgr.Close() + + addKeyspaceGroupAssignment( + suite.ctx, suite.etcdClient, true, + mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0)) + + // Set the timeout to 1 second and inject the delayLoadKeyspaceGroups to return 3 seconds to let + // the loading sleep 3 seconds. + mgr.loadKeyspaceGroupsTimeout = time.Second + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/delayLoadKeyspaceGroups", "return(3)")) + err := mgr.Initialize(true) + // If loading keyspace groups timeout, the initialization should fail with ErrLoadKeyspaceGroupsTerminated. + re.Equal(errs.ErrLoadKeyspaceGroupsTerminated, err) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/delayLoadKeyspaceGroups")) +} + +// TestLoadKeyspaceGroupsSucceedWithTempFailures tests the initialization should succeed when there are temporary +// failures during loading the initial keyspace group assignment from etcd. +func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsSucceedWithTempFailures() { + re := suite.Require() + + mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 0, 1) + re.NotNil(mgr) + defer mgr.Close() + + addKeyspaceGroupAssignment( + suite.ctx, suite.etcdClient, true, + mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0)) + + // Set the max retry times to 3 and inject the loadKeyspaceGroupsTemporaryFail to return 2 to let + // loading from etcd fail 2 times but the whole initialization still succeeds. + mgr.loadFromEtcdMaxRetryTimes = 3 + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/loadKeyspaceGroupsTemporaryFail", "return(2)")) + err := mgr.Initialize(true) + re.NoError(err) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/loadKeyspaceGroupsTemporaryFail")) +} + +// TestLoadKeyspaceGroupsFailed tests the initialization should fail when there are too many failures +// during loading the initial keyspace group assignment from etcd. +func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsFailed() { + re := suite.Require() + + mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 0, 1) + re.NotNil(mgr) + defer mgr.Close() + + addKeyspaceGroupAssignment( + suite.ctx, suite.etcdClient, true, + mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0)) + + // Set the max retry times to 3 and inject the loadKeyspaceGroupsTemporaryFail to return 3 to let + // loading from etcd fail 3 times which should cause the whole initialization to fail. + mgr.loadFromEtcdMaxRetryTimes = 3 + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/loadKeyspaceGroupsTemporaryFail", "return(3)")) + err := mgr.Initialize(true) + re.Error(err) + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/loadKeyspaceGroupsTemporaryFail")) +} + // TestWatchAndDynamicallyApplyChanges tests the keyspace group manager watch and dynamically apply // keyspace groups' membership/distribution meta changes. func (suite *keyspaceGroupManagerTestSuite) TestWatchAndDynamicallyApplyChanges() { From 9f4417a4df2a0f111cc8c13c6b1ad48a7610dd3b Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Thu, 6 Apr 2023 20:06:09 -0700 Subject: [PATCH 2/2] Fix go fmt errors Signed-off-by: Bin Shi --- errors.toml | 5 +++++ pkg/tso/keyspace_group_manager_test.go | 19 +++++++++---------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/errors.toml b/errors.toml index 5b9ecd0a345..1133a8bf12a 100644 --- a/errors.toml +++ b/errors.toml @@ -1,6 +1,11 @@ # AUTOGENERATED BY github.com/pingcap/errors/errdoc-gen # YOU CAN CHANGE THE 'description'/'workaround' FIELDS IF THEM ARE IMPROPER. +["ErrLoadKeyspaceGroupsRetryExhaustd"] +error = ''' +load keyspace groups retry exhausted, %s +''' + ["ErrLoadKeyspaceGroupsTerminated"] error = ''' load keyspace groups terminated diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index ec81c3e0434..64f1a462293 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -175,7 +175,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadWithDifferentBatchSize() { func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsTimeout() { re := suite.Require() - mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 0, 1) + mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 1) re.NotNil(mgr) defer mgr.Close() @@ -198,7 +198,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsTimeout() { func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsSucceedWithTempFailures() { re := suite.Require() - mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 0, 1) + mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 1) re.NotNil(mgr) defer mgr.Close() @@ -220,7 +220,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsSucceedWithTem func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsFailed() { re := suite.Require() - mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 0, 1) + mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 1) re.NotNil(mgr) defer mgr.Close() @@ -243,7 +243,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestWatchAndDynamicallyApplyChanges( re := suite.Require() // Start with the empty keyspace group assignment. - mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 0, 0) + mgr := newUniqueKeyspaceGroupManager(suite.ctx, suite.etcdClient, suite.cfg, 0) re.NotNil(mgr) defer mgr.Close() err := mgr.Initialize(true) @@ -343,7 +343,7 @@ func runTestLoadKeyspaceGroupsAssignment( probabilityAssignToMe int, // percentage of assigning keyspace groups to this host/pod ) { idsExpected := []int{} - mgr := newUniqueKeyspaceGroupManager(ctx, etcdClient, cfg, 0, loadKeyspaceGroupsBatchSize) + mgr := newUniqueKeyspaceGroupManager(ctx, etcdClient, cfg, loadKeyspaceGroupsBatchSize) re.NotNil(mgr) defer mgr.Close() @@ -390,8 +390,9 @@ func runTestLoadKeyspaceGroupsAssignment( } func newUniqueKeyspaceGroupManager( - ctx context.Context, etcdClient *clientv3.Client, cfg *TestServiceConfig, - loadKeyspaceGroupsTimeout time.Duration, // set to 0 to use the default value + ctx context.Context, + etcdClient *clientv3.Client, + cfg *TestServiceConfig, loadKeyspaceGroupsBatchSize int64, // set to 0 to use the default value ) *KeyspaceGroupManager { tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: cfg.AdvertiseListenAddr} @@ -403,9 +404,7 @@ func newUniqueKeyspaceGroupManager( keyspaceGroupManager := NewKeyspaceGroupManager( ctx, tsoServiceID, etcdClient, electionNamePrefix, legacySvcRootPath, tsoSvcRootPath, cfg) - if loadKeyspaceGroupsTimeout != 0 { - keyspaceGroupManager.loadKeyspaceGroupsTimeout = loadKeyspaceGroupsTimeout - } + if loadKeyspaceGroupsBatchSize != 0 { keyspaceGroupManager.loadKeyspaceGroupsBatchSize = loadKeyspaceGroupsBatchSize }