From 3787d04eae48157003cadc7c436c256ff83833dc Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 4 May 2023 19:48:02 +0800 Subject: [PATCH 1/7] use pd mode Signed-off-by: lhy1024 --- client/pd_service_discovery.go | 6 +++++- tests/integrations/mcs/tso/server_test.go | 3 +++ tests/integrations/tso/client_test.go | 4 +++- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index d02932c51ed..8c01ba85312 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -239,10 +239,14 @@ func (c *pdServiceDiscovery) updateMemberLoop() { } func (c *pdServiceDiscovery) updateServiceModeLoop() { + defer c.wg.Done() failpoint.Inject("skipUpdateServiceMode", func() { failpoint.Return() }) - defer c.wg.Done() + failpoint.Inject("usePDServiceMode", func() { + c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE) + failpoint.Return() + }) ctx, cancel := context.WithCancel(c.ctx) defer cancel() diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index d074c49a497..9eae238056a 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -239,6 +240,7 @@ func (suite *APIServerForwardTestSuite) SetupSuite() { suite.NoError(suite.pdLeader.BootstrapCluster()) suite.addRegions() + suite.NoError(failpoint.Enable("github.com/tikv/pd/client/usePDServiceMode", "return(true)")) suite.pdClient, err = pd.NewClientWithContext(context.Background(), []string{suite.backendEndpoints}, pd.SecurityOption{}, pd.WithMaxErrorRetry(1)) suite.NoError(err) @@ -258,6 +260,7 @@ func (suite *APIServerForwardTestSuite) TearDownSuite() { } suite.cluster.Destroy() suite.cancel() + suite.NoError(failpoint.Disable("github.com/tikv/pd/client/usePDServiceMode")) } func (suite *APIServerForwardTestSuite) TestForwardTSORelated() { diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 31258a23bd1..b5097b86798 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/testutil" + bs "github.com/tikv/pd/pkg/basicserver" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/tempurl" @@ -317,8 +318,9 @@ func TestMixedTSODeployment(t *testing.T) { err = apiSvr.Run() re.NoError(err) - _, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints, tempurl.Alloc()) + s, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints, tempurl.Alloc()) defer cleanup() + mcs.WaitForPrimaryServing(re, map[string]bs.Server{s.GetAddr(): s}) ctx1, cancel1 := context.WithCancel(context.Background()) var wg sync.WaitGroup From 787509b10af20e3d6a28aaa4c25c57de98a85f4d Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 5 May 2023 02:01:09 +0800 Subject: [PATCH 2/7] use mcs/utils and use failpoint test Signed-off-by: lhy1024 --- pkg/keyspace/keyspace.go | 15 +-- pkg/keyspace/keyspace_test.go | 5 +- pkg/keyspace/util.go | 5 +- pkg/keyspace/util_test.go | 5 +- pkg/mcs/utils/constant.go | 3 + server/grpc_service.go | 19 +++- server/server.go | 6 + tests/integrations/mcs/tso/server_test.go | 113 ++++++++++++++++++- tests/server/apiv2/handlers/keyspace_test.go | 10 +- 9 files changed, 154 insertions(+), 27 deletions(-) diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index 7bd8de7bc28..cf549fdf57a 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/id" "github.com/tikv/pd/pkg/schedule" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/endpoint" @@ -38,10 +39,6 @@ const ( AllocStep = uint64(100) // AllocLabel is used to label keyspace idAllocator's metrics. AllocLabel = "keyspace-idAlloc" - // DefaultKeyspaceName is the name reserved for default keyspace. - DefaultKeyspaceName = "DEFAULT" - // DefaultKeyspaceID is the id of default keyspace. - DefaultKeyspaceID = uint32(0) // regionLabelIDPrefix is used to prefix the keyspace region label. regionLabelIDPrefix = "keyspaces/" // regionLabelKey is the key for keyspace id in keyspace region label. @@ -106,13 +103,13 @@ func NewKeyspaceManager(store endpoint.KeyspaceStorage, // Bootstrap saves default keyspace info. func (manager *Manager) Bootstrap() error { // Split Keyspace Region for default keyspace. - if err := manager.splitKeyspaceRegion(DefaultKeyspaceID); err != nil { + if err := manager.splitKeyspaceRegion(utils.DefaultKeyspaceID); err != nil { return err } now := time.Now().Unix() defaultKeyspaceMeta := &keyspacepb.KeyspaceMeta{ - Id: DefaultKeyspaceID, - Name: DefaultKeyspaceName, + Id: utils.DefaultKeyspaceID, + Name: utils.DefaultKeyspaceName, State: keyspacepb.KeyspaceState_ENABLED, CreatedAt: now, StateChangedAt: now, @@ -425,7 +422,7 @@ func (manager *Manager) UpdateKeyspaceConfig(name string, mutations []*Mutation) // It returns error if saving failed, operation not allowed, or if keyspace not exists. func (manager *Manager) UpdateKeyspaceState(name string, newState keyspacepb.KeyspaceState, now int64) (*keyspacepb.KeyspaceMeta, error) { // Changing the state of default keyspace is not allowed. - if name == DefaultKeyspaceName { + if name == utils.DefaultKeyspaceName { log.Warn("[keyspace] failed to update keyspace config", zap.Error(errModifyDefault), ) @@ -477,7 +474,7 @@ func (manager *Manager) UpdateKeyspaceState(name string, newState keyspacepb.Key // It returns error if saving failed, operation not allowed, or if keyspace not exists. func (manager *Manager) UpdateKeyspaceStateByID(id uint32, newState keyspacepb.KeyspaceState, now int64) (*keyspacepb.KeyspaceMeta, error) { // Changing the state of default keyspace is not allowed. - if id == DefaultKeyspaceID { + if id == utils.DefaultKeyspaceID { log.Warn("[keyspace] failed to update keyspace config", zap.Error(errModifyDefault), ) diff --git a/pkg/keyspace/keyspace_test.go b/pkg/keyspace/keyspace_test.go index f1ef85711fd..5c458684635 100644 --- a/pkg/keyspace/keyspace_test.go +++ b/pkg/keyspace/keyspace_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" @@ -163,7 +164,7 @@ func (suite *keyspaceTestSuite) TestUpdateKeyspaceConfig() { re.Error(err) } // Changing config of DEFAULT keyspace is allowed. - updated, err := manager.UpdateKeyspaceConfig(DefaultKeyspaceName, mutations) + updated, err := manager.UpdateKeyspaceConfig(utils.DefaultKeyspaceName, mutations) re.NoError(err) // remove auto filled fields delete(updated.Config, TSOKeyspaceGroupIDKey) @@ -203,7 +204,7 @@ func (suite *keyspaceTestSuite) TestUpdateKeyspaceState() { _, err = manager.UpdateKeyspaceState(createRequest.Name, keyspacepb.KeyspaceState_ENABLED, newTime) re.Error(err) // Changing state of DEFAULT keyspace is not allowed. - _, err = manager.UpdateKeyspaceState(DefaultKeyspaceName, keyspacepb.KeyspaceState_DISABLED, newTime) + _, err = manager.UpdateKeyspaceState(utils.DefaultKeyspaceName, keyspacepb.KeyspaceState_DISABLED, newTime) re.Error(err) } } diff --git a/pkg/keyspace/util.go b/pkg/keyspace/util.go index 69c1e776f04..b739fea8898 100644 --- a/pkg/keyspace/util.go +++ b/pkg/keyspace/util.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/tikv/pd/pkg/codec" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/storage/endpoint" ) @@ -77,7 +78,7 @@ func validateID(id uint32) error { if id > spaceIDMax { return errors.Errorf("illegal keyspace id %d, larger than spaceID Max %d", id, spaceIDMax) } - if id == DefaultKeyspaceID { + if id == utils.DefaultKeyspaceID { return errors.Errorf("illegal keyspace id %d, collides with default keyspace id", id) } return nil @@ -94,7 +95,7 @@ func validateName(name string) error { if !isValid { return errors.Errorf("illegal keyspace name %s, should contain only alphanumerical and underline", name) } - if name == DefaultKeyspaceName { + if name == utils.DefaultKeyspaceName { return errors.Errorf("illegal keyspace name %s, collides with default keyspace name", name) } return nil diff --git a/pkg/keyspace/util_test.go b/pkg/keyspace/util_test.go index 40277e298b6..02da4814d50 100644 --- a/pkg/keyspace/util_test.go +++ b/pkg/keyspace/util_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/codec" "github.com/tikv/pd/pkg/schedule/labeler" + "github.com/tikv/pd/pkg/mcs/utils" ) func TestValidateID(t *testing.T) { @@ -30,7 +31,7 @@ func TestValidateID(t *testing.T) { id uint32 hasErr bool }{ - {DefaultKeyspaceID, true}, // Reserved id should result in error. + {utils.DefaultKeyspaceID, true}, // Reserved id should result in error. {100, false}, {spaceIDMax - 1, false}, {spaceIDMax, false}, @@ -48,7 +49,7 @@ func TestValidateName(t *testing.T) { name string hasErr bool }{ - {DefaultKeyspaceName, true}, // Reserved name should result in error. + {utils.DefaultKeyspaceName, true}, // Reserved name should result in error. {"keyspaceName1", false}, {"keyspace_name_1", false}, {"10", false}, diff --git a/pkg/mcs/utils/constant.go b/pkg/mcs/utils/constant.go index e29aa6a5008..fba1881b4c4 100644 --- a/pkg/mcs/utils/constant.go +++ b/pkg/mcs/utils/constant.go @@ -35,6 +35,9 @@ const ( // LeaderTickInterval is the interval to check leader LeaderTickInterval = 50 * time.Millisecond + // DefaultKeyspaceName is the name reserved for default keyspace. + DefaultKeyspaceName = "DEFAULT" + // DefaultKeyspaceID is the default key space id. // Valid keyspace id range is [0, 0xFFFFFF](uint24max, or 16777215) // ​0 is reserved for default keyspace with the name "DEFAULT", It's initialized when PD bootstrap diff --git a/server/grpc_service.go b/server/grpc_service.go index aac69e4b8c2..080a150e553 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -51,6 +51,7 @@ import ( const ( heartbeatSendTimeout = 5 * time.Second maxRetryTimesGetGlobalTSOFromTSOServer = 3 + retryIntervalGetGlobalTSOFromTSOServer = 500 * time.Millisecond ) // gRPC errors @@ -1774,10 +1775,6 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan } func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timestamp, error) { - forwardedHost, ok := s.GetServicePrimaryAddr(ctx, utils.TSOServiceName) - if !ok || forwardedHost == "" { - return pdpb.Timestamp{}, ErrNotFoundTSOAddr - } request := &tsopb.TsoRequest{ Header: &tsopb.RequestHeader{ ClusterId: s.clusterID, @@ -1787,11 +1784,16 @@ func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timest Count: 1, } var ( + forwardedHost string forwardStream tsopb.TSO_TsoClient ts *tsopb.TsoResponse err error ) for i := 0; i < maxRetryTimesGetGlobalTSOFromTSOServer; i++ { + forwardedHost, ok := s.GetServicePrimaryAddr(ctx, utils.TSOServiceName) + if !ok || forwardedHost == "" { + return pdpb.Timestamp{}, ErrNotFoundTSOAddr + } forwardStream, err = s.getTSOForwardStream(forwardedHost) if err != nil { return pdpb.Timestamp{}, err @@ -1799,6 +1801,15 @@ func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timest forwardStream.Send(request) ts, err = forwardStream.Recv() if err != nil { + if strings.Contains(err.Error(), errs.NotLeaderErr) || strings.Contains(err.Error(), errs.MismatchLeaderErr) { + select { + case s.updateServicePrimaryAddrCh <- struct{}{}: + log.Info("update service primary address when meet not leader error") + default: + } + time.Sleep(retryIntervalGetGlobalTSOFromTSOServer) + continue + } if strings.Contains(err.Error(), codes.Unavailable.String()) { s.tsoClientPool.Lock() delete(s.tsoClientPool.clients, forwardedHost) diff --git a/server/server.go b/server/server.go index e83831364c7..6e0ba68d9ae 100644 --- a/server/server.go +++ b/server/server.go @@ -1840,6 +1840,12 @@ func (s *Server) updateServicePrimaryAddr(serviceName string) (nextRevision int6 return revision, nil } +// SetServicePrimaryAddr sets the primary address directly. +// Note: This function is only used for test. +func (s *Server) SetServicePrimaryAddr(serviceName, addr string) { + s.servicePrimaryMap.Store(serviceName, addr) +} + func (s *Server) servicePrimaryKey(serviceName string) string { return fmt.Sprintf("/ms/%d/%s/%s/%s", s.clusterID, serviceName, fmt.Sprintf("%05d", 0), "primary") } diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index 9eae238056a..ed6973ec9f2 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -224,11 +224,11 @@ func TestAPIServerForwardTestSuite(t *testing.T) { suite.Run(t, new(APIServerForwardTestSuite)) } -func (suite *APIServerForwardTestSuite) SetupSuite() { +func (suite *APIServerForwardTestSuite) SetupTest() { var err error re := suite.Require() suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 1) + suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 3) re.NoError(err) err = suite.cluster.RunInitialServers() @@ -246,7 +246,7 @@ func (suite *APIServerForwardTestSuite) SetupSuite() { suite.NoError(err) } -func (suite *APIServerForwardTestSuite) TearDownSuite() { +func (suite *APIServerForwardTestSuite) TearDownTest() { suite.pdClient.Close() etcdClient := suite.pdLeader.GetEtcdClient() @@ -308,6 +308,113 @@ func (suite *APIServerForwardTestSuite) TestForwardTSOWhenPrimaryChanged() { suite.checkAvailableTSO() } +func (suite *APIServerForwardTestSuite) TestResignTSOPrimaryForward() { + // TODO: test random kill primary with 3 nodes + re := suite.Require() + + tc, err := mcs.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForDefaultPrimaryServing(re) + + for j := 0; j < 10; j++ { + tc.ResignPrimary(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID) + tc.WaitForDefaultPrimaryServing(re) + var err error + for i := 0; i < 3; i++ { // try 3 times + _, _, err = suite.pdClient.GetTS(suite.ctx) + if err == nil { + break + } + time.Sleep(100 * time.Millisecond) + } + suite.NoError(err) + suite.checkAvailableTSO() + } +} + +func (suite *APIServerForwardTestSuite) TestResignAPIPrimaryForward() { + re := suite.Require() + + tc, err := mcs.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForDefaultPrimaryServing(re) + + for j := 0; j < 10; j++ { + suite.pdLeader.ResignLeader() + suite.pdLeader = suite.cluster.GetServer(suite.cluster.WaitLeader()) + suite.backendEndpoints = suite.pdLeader.GetAddr() + _, _, err = suite.pdClient.GetTS(suite.ctx) + suite.NoError(err) + } +} + +func (suite *APIServerForwardTestSuite) TestForwardTSOUnexpectedToFollower1() { + suite.checkForwardTSOUnexpectedToFollower(func() { + // unary call will retry internally + // try to update gc safe point + min, err := suite.pdClient.UpdateServiceGCSafePoint(context.Background(), "a", 1000, 1) + suite.NoError(err) + suite.Equal(uint64(0), min) + + }) +} + +func (suite *APIServerForwardTestSuite) TestForwardTSOUnexpectedToFollower2() { + suite.checkForwardTSOUnexpectedToFollower(func() { + // unary call will retry internally + // try to set external ts + ts, err := suite.pdClient.GetExternalTimestamp(suite.ctx) + suite.NoError(err) + err = suite.pdClient.SetExternalTimestamp(suite.ctx, ts+1) + suite.NoError(err) + }) +} + +func (suite *APIServerForwardTestSuite) TestForwardTSOUnexpectedToFollower3() { + suite.checkForwardTSOUnexpectedToFollower(func() { + _, _, err := suite.pdClient.GetTS(suite.ctx) + suite.Error(err) + }) +} + +func (suite *APIServerForwardTestSuite) checkForwardTSOUnexpectedToFollower(checkTSO func()) { + re := suite.Require() + tc, err := mcs.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) + re.NoError(err) + tc.WaitForDefaultPrimaryServing(re) + + // get follower's address + servers := tc.GetServers() + oldPrimary := tc.GetPrimaryServer(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID).GetAddr() + var follower string + for addr := range servers { + if addr != oldPrimary { + follower = addr + break + } + } + re.NotEmpty(follower) + + // write follower's address to cache to simulate cache is not updated. + suite.pdLeader.GetServer().SetServicePrimaryAddr(utils.TSOServiceName, follower) + errorAddr, ok := suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, utils.TSOServiceName) + suite.True(ok) + suite.Equal(follower, errorAddr) + + // test tso request + checkTSO() + + // test tso request will success after cache is updated + suite.checkAvailableTSO() + newPrimary, exist2 := suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, utils.TSOServiceName) + suite.True(exist2) + suite.NotEqual(errorAddr, newPrimary) + suite.Equal(oldPrimary, newPrimary) + tc.Destroy() +} + func (suite *APIServerForwardTestSuite) addRegions() { leader := suite.cluster.GetServer(suite.cluster.WaitLeader()) rc := leader.GetServer().GetRaftCluster() diff --git a/tests/server/apiv2/handlers/keyspace_test.go b/tests/server/apiv2/handlers/keyspace_test.go index f976208f65b..4feb7c9af5c 100644 --- a/tests/server/apiv2/handlers/keyspace_test.go +++ b/tests/server/apiv2/handlers/keyspace_test.go @@ -22,7 +22,7 @@ import ( "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "github.com/tikv/pd/pkg/keyspace" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/apiv2/handlers" "github.com/tikv/pd/tests" @@ -68,8 +68,8 @@ func (suite *keyspaceTestSuite) TestCreateLoadKeyspace() { loaded := mustLoadKeyspaces(re, suite.server, created.Name) re.Equal(created, loaded) } - defaultKeyspace := mustLoadKeyspaces(re, suite.server, keyspace.DefaultKeyspaceName) - re.Equal(keyspace.DefaultKeyspaceName, defaultKeyspace.Name) + defaultKeyspace := mustLoadKeyspaces(re, suite.server, utils.DefaultKeyspaceName) + re.Equal(utils.DefaultKeyspaceName, defaultKeyspace.Name) re.Equal(keyspacepb.KeyspaceState_ENABLED, defaultKeyspace.State) } @@ -120,7 +120,7 @@ func (suite *keyspaceTestSuite) TestUpdateKeyspaceState() { re.Equal(keyspacepb.KeyspaceState_TOMBSTONE, tombstone.State) } // Changing default keyspace's state is NOT allowed. - success, _ := sendUpdateStateRequest(re, suite.server, keyspace.DefaultKeyspaceName, &handlers.UpdateStateParam{State: "disabled"}) + success, _ := sendUpdateStateRequest(re, suite.server, utils.DefaultKeyspaceName, &handlers.UpdateStateParam{State: "disabled"}) re.False(success) } @@ -134,7 +134,7 @@ func (suite *keyspaceTestSuite) TestLoadRangeKeyspace() { for i, created := range keyspaces { re.Equal(created, loadResponse.Keyspaces[i+1].KeyspaceMeta) } - re.Equal(keyspace.DefaultKeyspaceName, loadResponse.Keyspaces[0].Name) + re.Equal(utils.DefaultKeyspaceName, loadResponse.Keyspaces[0].Name) re.Equal(keyspacepb.KeyspaceState_ENABLED, loadResponse.Keyspaces[0].State) } From d40c363447204a093762f8ef50f6c4e1edc0a1a2 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 5 May 2023 02:11:48 +0800 Subject: [PATCH 3/7] fix lint Signed-off-by: lhy1024 --- pkg/keyspace/keyspace.go | 2 +- pkg/keyspace/util_test.go | 2 +- tests/integrations/client/keyspace_test.go | 9 +++++---- tests/integrations/mcs/tso/server_test.go | 1 - 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index cf549fdf57a..2840bfff069 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -23,8 +23,8 @@ import ( "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/id" - "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/schedule" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/endpoint" diff --git a/pkg/keyspace/util_test.go b/pkg/keyspace/util_test.go index 02da4814d50..c7b3738a811 100644 --- a/pkg/keyspace/util_test.go +++ b/pkg/keyspace/util_test.go @@ -21,8 +21,8 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/codec" - "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/schedule/labeler" ) func TestValidateID(t *testing.T) { diff --git a/tests/integrations/client/keyspace_test.go b/tests/integrations/client/keyspace_test.go index cb3adfd4d2e..219d8f08809 100644 --- a/tests/integrations/client/keyspace_test.go +++ b/tests/integrations/client/keyspace_test.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/keyspace" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/server" ) @@ -61,10 +62,10 @@ func (suite *clientTestSuite) TestLoadKeyspace() { _, err := suite.client.LoadKeyspace(suite.ctx, "non-existing keyspace") re.Error(err) // Loading default keyspace should be successful. - keyspaceDefault, err := suite.client.LoadKeyspace(suite.ctx, keyspace.DefaultKeyspaceName) + keyspaceDefault, err := suite.client.LoadKeyspace(suite.ctx, utils.DefaultKeyspaceName) re.NoError(err) - re.Equal(keyspace.DefaultKeyspaceID, keyspaceDefault.GetId()) - re.Equal(keyspace.DefaultKeyspaceName, keyspaceDefault.GetName()) + re.Equal(utils.DefaultKeyspaceID, keyspaceDefault.GetId()) + re.Equal(utils.DefaultKeyspaceName, keyspaceDefault.GetName()) } func (suite *clientTestSuite) TestWatchKeyspaces() { @@ -105,7 +106,7 @@ func (suite *clientTestSuite) TestWatchKeyspaces() { loaded = <-watchChan re.Equal([]*keyspacepb.KeyspaceMeta{expected}, loaded) // Updates to default keyspace's config should also be captured. - expected, err = suite.srv.GetKeyspaceManager().UpdateKeyspaceConfig(keyspace.DefaultKeyspaceName, []*keyspace.Mutation{ + expected, err = suite.srv.GetKeyspaceManager().UpdateKeyspaceConfig(utils.DefaultKeyspaceName, []*keyspace.Mutation{ { Op: keyspace.OpPut, Key: "config", diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index ed6973ec9f2..81139f07fe8 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -357,7 +357,6 @@ func (suite *APIServerForwardTestSuite) TestForwardTSOUnexpectedToFollower1() { min, err := suite.pdClient.UpdateServiceGCSafePoint(context.Background(), "a", 1000, 1) suite.NoError(err) suite.Equal(uint64(0), min) - }) } From fb1be5cd6268521faa7ea24338b618e23fc16d20 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 5 May 2023 02:32:49 +0800 Subject: [PATCH 4/7] add test Signed-off-by: lhy1024 --- tests/integrations/mcs/tso/server_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index 81139f07fe8..232d97f4eea 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -281,9 +281,12 @@ func (suite *APIServerForwardTestSuite) TestForwardTSOWhenPrimaryChanged() { defer tc.Destroy() tc.WaitForDefaultPrimaryServing(re) - // can use the tso-related interface with new primary + // can use the tso-related interface with old primary oldPrimary, exist := suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, utils.TSOServiceName) re.True(exist) + suite.checkAvailableTSO() + + // can use the tso-related interface with new primary tc.DestroyServer(oldPrimary) time.Sleep(time.Duration(utils.DefaultLeaderLease) * time.Second) // wait for leader lease timeout tc.WaitForDefaultPrimaryServing(re) From 9107f0b00bec83a908f27aa62073ec61265feb10 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 5 May 2023 18:27:52 +0800 Subject: [PATCH 5/7] address comments Signed-off-by: lhy1024 --- pkg/tso/global_allocator.go | 2 +- pkg/tso/tso.go | 2 +- pkg/utils/tsoutil/tso_dispatcher.go | 2 +- server/grpc_service.go | 12 ++++++------ 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 0dbf82c2398..0227f2c1a64 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -259,7 +259,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) // 5. Check leadership again before we returning the response. if !gta.member.GetLeadership().Check() { tsoCounter.WithLabelValues("not_leader_anymore", gta.timestampOracle.dcLocation).Inc() - return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("not the pd leader anymore") + return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested %s anymore", errs.NotLeaderErr)) } // 6. Calibrate the logical part to make the TSO unique globally by giving it a unique suffix in the whole cluster globalTSOResp.Logical = gta.timestampOracle.calibrateLogical(globalTSOResp.GetLogical(), suffixBits) diff --git a/pkg/tso/tso.go b/pkg/tso/tso.go index 523370cd677..525385b42eb 100644 --- a/pkg/tso/tso.go +++ b/pkg/tso/tso.go @@ -359,7 +359,7 @@ func (t *timestampOracle) getTS(leadership *election.Leadership, count uint32, s } // In case lease expired after the first check. if !leadership.Check() { - return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("not the pd or local tso allocator leader anymore") + return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested %s anymore", errs.NotLeaderErr)) } resp.SuffixBits = uint32(suffixBits) return resp, nil diff --git a/pkg/utils/tsoutil/tso_dispatcher.go b/pkg/utils/tsoutil/tso_dispatcher.go index 0d18c5a3e6e..2bf903a0f46 100644 --- a/pkg/utils/tsoutil/tso_dispatcher.go +++ b/pkg/utils/tsoutil/tso_dispatcher.go @@ -138,7 +138,7 @@ func (s *TSODispatcher) dispatch( zap.String("forwarded-host", forwardedHost), errs.ZapError(errs.ErrGRPCSend, err)) if needUpdateServicePrimaryAddr { - if strings.Contains(err.Error(), errs.NotLeaderErr) || strings.Contains(err.Error(), errs.MismatchLeaderErr) { + if strings.Contains(err.Error(), errs.NotLeaderErr) { select { case updateServicePrimaryAddrChs[0] <- struct{}{}: default: diff --git a/server/grpc_service.go b/server/grpc_service.go index 080a150e553..8aa6b309fbe 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -49,9 +49,9 @@ import ( ) const ( - heartbeatSendTimeout = 5 * time.Second - maxRetryTimesGetGlobalTSOFromTSOServer = 3 - retryIntervalGetGlobalTSOFromTSOServer = 500 * time.Millisecond + heartbeatSendTimeout = 5 * time.Second + maxRetryTimesRequestTSOServer = 3 + retryIntervalRequestTSOServer = 500 * time.Millisecond ) // gRPC errors @@ -1789,7 +1789,7 @@ func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timest ts *tsopb.TsoResponse err error ) - for i := 0; i < maxRetryTimesGetGlobalTSOFromTSOServer; i++ { + for i := 0; i < maxRetryTimesRequestTSOServer; i++ { forwardedHost, ok := s.GetServicePrimaryAddr(ctx, utils.TSOServiceName) if !ok || forwardedHost == "" { return pdpb.Timestamp{}, ErrNotFoundTSOAddr @@ -1801,13 +1801,13 @@ func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timest forwardStream.Send(request) ts, err = forwardStream.Recv() if err != nil { - if strings.Contains(err.Error(), errs.NotLeaderErr) || strings.Contains(err.Error(), errs.MismatchLeaderErr) { + if strings.Contains(err.Error(), errs.NotLeaderErr) { select { case s.updateServicePrimaryAddrCh <- struct{}{}: log.Info("update service primary address when meet not leader error") default: } - time.Sleep(retryIntervalGetGlobalTSOFromTSOServer) + time.Sleep(retryIntervalRequestTSOServer) continue } if strings.Contains(err.Error(), codes.Unavailable.String()) { From 1c92923649c7036ce61d4af3c8a8c8e38c97fa79 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 5 May 2023 18:46:55 +0800 Subject: [PATCH 6/7] fix lint Signed-off-by: lhy1024 --- server/grpc_service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/grpc_service.go b/server/grpc_service.go index 8aa6b309fbe..70c26b46448 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -49,7 +49,7 @@ import ( ) const ( - heartbeatSendTimeout = 5 * time.Second + heartbeatSendTimeout = 5 * time.Second maxRetryTimesRequestTSOServer = 3 retryIntervalRequestTSOServer = 500 * time.Millisecond ) From fda89d62936f5aed4808f7221212bb01118acd35 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 5 May 2023 18:53:59 +0800 Subject: [PATCH 7/7] fix conflict Signed-off-by: lhy1024 --- pkg/keyspace/keyspace.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index 62277ed6888..939e09437a5 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -582,7 +582,7 @@ func (manager *Manager) PatrolKeyspaceAssignment() error { if defaultKeyspaceGroup.IsSplitting() { return ErrKeyspaceGroupInSplit } - keyspaces, err := manager.store.LoadRangeKeyspace(txn, DefaultKeyspaceID, 0) + keyspaces, err := manager.store.LoadRangeKeyspace(txn, utils.DefaultKeyspaceID, 0) if err != nil { return err }