diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index d9866b7a9db..25a832c5f48 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -244,10 +244,9 @@ func (s *Server) GetMember(keyspaceID, keyspaceGroupID uint32) (tso.ElectionMemb return member, nil } -// ResignPrimary resigns the primary of the given keyspace and keyspace group. -func (s *Server) ResignPrimary() error { - member, err := s.keyspaceGroupManager.GetElectionMember( - mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID) +// ResignPrimary resigns the primary of the given keyspace. +func (s *Server) ResignPrimary(keyspaceID, keyspaceGroupID uint32) error { + member, err := s.keyspaceGroupManager.GetElectionMember(keyspaceID, keyspaceGroupID) if err != nil { return err } diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 4c9e8dac4ca..0089e0d9bdc 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -812,7 +812,7 @@ func (kgm *KeyspaceGroupManager) FindGroupByKeyspaceID( return curAM, curKeyspaceGroup, curKeyspaceGroupID, nil } -// GetElectionMember returns the election member of the given keyspace group +// GetElectionMember returns the election member of the keyspace group serving the given keyspace. func (kgm *KeyspaceGroupManager) GetElectionMember( keyspaceID, keyspaceGroupID uint32, ) (ElectionMember, error) { diff --git a/tests/integrations/mcs/cluster.go b/tests/integrations/mcs/cluster.go index 228f506454d..dbc9964b62b 100644 --- a/tests/integrations/mcs/cluster.go +++ b/tests/integrations/mcs/cluster.go @@ -16,6 +16,7 @@ package mcs import ( "context" + "fmt" "time" "github.com/stretchr/testify/require" @@ -92,12 +93,16 @@ func (tc *TestTSOCluster) DestroyServer(addr string) { } // ResignPrimary resigns the primary TSO server. -func (tc *TestTSOCluster) ResignPrimary() { - tc.GetPrimary(mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID).ResignPrimary() +func (tc *TestTSOCluster) ResignPrimary(keyspaceID, keyspaceGroupID uint32) error { + primaryServer := tc.GetPrimaryServer(keyspaceID, keyspaceGroupID) + if primaryServer == nil { + return fmt.Errorf("no tso server serves this keyspace %d", keyspaceID) + } + return primaryServer.ResignPrimary(keyspaceID, keyspaceGroupID) } -// GetPrimary returns the primary TSO server. -func (tc *TestTSOCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Server { +// GetPrimaryServer returns the primary TSO server of the given keyspace +func (tc *TestTSOCluster) GetPrimaryServer(keyspaceID, keyspaceGroupID uint32) *tso.Server { for _, server := range tc.servers { if server.IsKeyspaceServing(keyspaceID, keyspaceGroupID) { return server diff --git a/tests/integrations/mcs/testutil.go b/tests/integrations/mcs/testutil.go index 7151c7d3416..3ca1ad39436 100644 --- a/tests/integrations/mcs/testutil.go +++ b/tests/integrations/mcs/testutil.go @@ -154,7 +154,7 @@ func WaitForTSOServiceAvailable(ctx context.Context, pdClient pd.Client) error { // CheckMultiKeyspacesTSO checks the correctness of TSO for multiple keyspaces. func CheckMultiKeyspacesTSO( ctx context.Context, re *require.Assertions, - clients []pd.Client, duration time.Duration, + clients []pd.Client, parallelAct func(), ) { ctx, cancel := context.WithCancel(ctx) wg := sync.WaitGroup{} @@ -184,8 +184,13 @@ func CheckMultiKeyspacesTSO( }(client) } - time.Sleep(duration) - cancel() + wg.Add(1) + go func() { + defer wg.Done() + parallelAct() + cancel() + }() + wg.Wait() } diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index f2ca2e37d32..db032faa251 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -136,7 +136,9 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByDefaultKeysp clients := mcs.WaitForMultiKeyspacesTSOAvailable( suite.ctx, re, keyspaceIDs, []string{suite.pdLeaderServer.GetAddr()}) re.Equal(len(keyspaceIDs), len(clients)) - mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, 3*time.Second) + mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, func() { + time.Sleep(3 * time.Second) + }) } func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKeyspaceGroup() { @@ -200,7 +202,9 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe clients := mcs.WaitForMultiKeyspacesTSOAvailable( suite.ctx, re, keyspaceIDs, []string{suite.pdLeaderServer.GetAddr()}) re.Equal(len(keyspaceIDs), len(clients)) - mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, 3*time.Second) + mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, func() { + time.Sleep(3 * time.Second) + }) } func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { @@ -231,7 +235,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { }) ts.Physical += time.Hour.Milliseconds() // Set the TSO of the keyspace group 1 to a large value. - err = suite.tsoCluster.GetPrimary(222, 1).GetHandler().ResetTS(tsoutil.GenerateTS(&ts), false, true, 1) + err = suite.tsoCluster.GetPrimaryServer(222, 1).GetHandler().ResetTS(tsoutil.GenerateTS(&ts), false, true, 1) re.NoError(err) // Split the keyspace group 1 to 2. handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{ diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index cc9440045ea..cb43f5a3b9a 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -349,13 +349,14 @@ func (suite *APIServerForwardTestSuite) checkAvailableTSO() { type CommonTestSuite struct { suite.Suite - ctx context.Context - cancel context.CancelFunc - cluster *tests.TestCluster - tsoCluster *mcs.TestTSOCluster - pdLeader *tests.TestServer - tsoPrimary *tso.Server - backendEndpoints string + ctx context.Context + cancel context.CancelFunc + cluster *tests.TestCluster + tsoCluster *mcs.TestTSOCluster + pdLeader *tests.TestServer + // tsoDefaultPrimaryServer is the primary server of the default keyspace group + tsoDefaultPrimaryServer *tso.Server + backendEndpoints string } func TestCommonTestSuite(t *testing.T) { @@ -380,7 +381,7 @@ func (suite *CommonTestSuite) SetupSuite() { suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints) suite.NoError(err) suite.tsoCluster.WaitForDefaultPrimaryServing(re) - suite.tsoPrimary = suite.tsoCluster.GetPrimary(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID) + suite.tsoDefaultPrimaryServer = suite.tsoCluster.GetPrimaryServer(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID) } func (suite *CommonTestSuite) TearDownSuite() { @@ -401,14 +402,14 @@ func (suite *CommonTestSuite) TearDownSuite() { func (suite *CommonTestSuite) TestAdvertiseAddr() { re := suite.Require() - conf := suite.tsoPrimary.GetConfig() + conf := suite.tsoDefaultPrimaryServer.GetConfig() re.Equal(conf.GetListenAddr(), conf.GetAdvertiseListenAddr()) } func (suite *CommonTestSuite) TestMetrics() { re := suite.Require() - resp, err := http.Get(suite.tsoPrimary.GetConfig().GetAdvertiseListenAddr() + "/metrics") + resp, err := http.Get(suite.tsoDefaultPrimaryServer.GetConfig().GetAdvertiseListenAddr() + "/metrics") re.NoError(err) defer resp.Body.Close() re.Equal(http.StatusOK, resp.StatusCode) diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index f6c535f1d2f..6607807c65b 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -251,7 +251,7 @@ func (suite *tsoClientTestSuite) TestRandomResignLeader() { for _, keyspaceID := range keyspaceIDs { go func(keyspaceID uint32) { defer wg.Done() - err := suite.tsoCluster.ResignPrimary(keyspaceID, 0) + err := suite.tsoCluster.ResignPrimary(keyspaceID, mcsutils.DefaultKeyspaceGroupID) re.NoError(err) suite.tsoCluster.WaitForPrimaryServing(re, keyspaceID, 0) }(keyspaceID)