Skip to content

Commit

Permalink
Fix test failure
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing committed May 4, 2023
1 parent 098029b commit cd4a076
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 26 deletions.
7 changes: 3 additions & 4 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,9 @@ func (s *Server) GetMember(keyspaceID, keyspaceGroupID uint32) (tso.ElectionMemb
return member, nil
}

// ResignPrimary resigns the primary of the given keyspace and keyspace group.
func (s *Server) ResignPrimary() error {
member, err := s.keyspaceGroupManager.GetElectionMember(
mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID)
// ResignPrimary resigns the primary of the given keyspace.
func (s *Server) ResignPrimary(keyspaceID, keyspaceGroupID uint32) error {
member, err := s.keyspaceGroupManager.GetElectionMember(keyspaceID, keyspaceGroupID)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ func (kgm *KeyspaceGroupManager) FindGroupByKeyspaceID(
return curAM, curKeyspaceGroup, curKeyspaceGroupID, nil
}

// GetElectionMember returns the election member of the given keyspace group
// GetElectionMember returns the election member of the keyspace group serving the given keyspace.
func (kgm *KeyspaceGroupManager) GetElectionMember(
keyspaceID, keyspaceGroupID uint32,
) (ElectionMember, error) {
Expand Down
13 changes: 9 additions & 4 deletions tests/integrations/mcs/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package mcs

import (
"context"
"fmt"
"time"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -92,12 +93,16 @@ func (tc *TestTSOCluster) DestroyServer(addr string) {
}

// ResignPrimary resigns the primary TSO server.
func (tc *TestTSOCluster) ResignPrimary() {
tc.GetPrimary(mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID).ResignPrimary()
func (tc *TestTSOCluster) ResignPrimary(keyspaceID, keyspaceGroupID uint32) error {
primaryServer := tc.GetPrimaryServer(keyspaceID, keyspaceGroupID)
if primaryServer == nil {
return fmt.Errorf("no tso server serves this keyspace %d", keyspaceID)
}
return primaryServer.ResignPrimary(keyspaceID, keyspaceGroupID)
}

// GetPrimary returns the primary TSO server.
func (tc *TestTSOCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Server {
// GetPrimaryServer returns the primary TSO server of the given keyspace
func (tc *TestTSOCluster) GetPrimaryServer(keyspaceID, keyspaceGroupID uint32) *tso.Server {
for _, server := range tc.servers {
if server.IsKeyspaceServing(keyspaceID, keyspaceGroupID) {
return server
Expand Down
11 changes: 8 additions & 3 deletions tests/integrations/mcs/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func WaitForTSOServiceAvailable(ctx context.Context, pdClient pd.Client) error {
// CheckMultiKeyspacesTSO checks the correctness of TSO for multiple keyspaces.
func CheckMultiKeyspacesTSO(
ctx context.Context, re *require.Assertions,
clients []pd.Client, duration time.Duration,
clients []pd.Client, parallelAct func(),
) {
ctx, cancel := context.WithCancel(ctx)
wg := sync.WaitGroup{}
Expand Down Expand Up @@ -184,8 +184,13 @@ func CheckMultiKeyspacesTSO(
}(client)
}

time.Sleep(duration)
cancel()
wg.Add(1)
go func() {
defer wg.Done()
parallelAct()
cancel()
}()

wg.Wait()
}

Expand Down
10 changes: 7 additions & 3 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByDefaultKeysp
clients := mcs.WaitForMultiKeyspacesTSOAvailable(
suite.ctx, re, keyspaceIDs, []string{suite.pdLeaderServer.GetAddr()})
re.Equal(len(keyspaceIDs), len(clients))
mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, 3*time.Second)
mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, func() {
time.Sleep(3 * time.Second)
})
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKeyspaceGroup() {
Expand Down Expand Up @@ -200,7 +202,9 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe
clients := mcs.WaitForMultiKeyspacesTSOAvailable(
suite.ctx, re, keyspaceIDs, []string{suite.pdLeaderServer.GetAddr()})
re.Equal(len(keyspaceIDs), len(clients))
mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, 3*time.Second)
mcs.CheckMultiKeyspacesTSO(suite.ctx, re, clients, func() {
time.Sleep(3 * time.Second)
})
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() {
Expand Down Expand Up @@ -231,7 +235,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() {
})
ts.Physical += time.Hour.Milliseconds()
// Set the TSO of the keyspace group 1 to a large value.
err = suite.tsoCluster.GetPrimary(222, 1).GetHandler().ResetTS(tsoutil.GenerateTS(&ts), false, true, 1)
err = suite.tsoCluster.GetPrimaryServer(222, 1).GetHandler().ResetTS(tsoutil.GenerateTS(&ts), false, true, 1)
re.NoError(err)
// Split the keyspace group 1 to 2.
handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{
Expand Down
21 changes: 11 additions & 10 deletions tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,14 @@ func (suite *APIServerForwardTestSuite) checkAvailableTSO() {

type CommonTestSuite struct {
suite.Suite
ctx context.Context
cancel context.CancelFunc
cluster *tests.TestCluster
tsoCluster *mcs.TestTSOCluster
pdLeader *tests.TestServer
tsoPrimary *tso.Server
backendEndpoints string
ctx context.Context
cancel context.CancelFunc
cluster *tests.TestCluster
tsoCluster *mcs.TestTSOCluster
pdLeader *tests.TestServer
// tsoDefaultPrimaryServer is the primary server of the default keyspace group
tsoDefaultPrimaryServer *tso.Server
backendEndpoints string
}

func TestCommonTestSuite(t *testing.T) {
Expand All @@ -380,7 +381,7 @@ func (suite *CommonTestSuite) SetupSuite() {
suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints)
suite.NoError(err)
suite.tsoCluster.WaitForDefaultPrimaryServing(re)
suite.tsoPrimary = suite.tsoCluster.GetPrimary(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID)
suite.tsoDefaultPrimaryServer = suite.tsoCluster.GetPrimaryServer(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID)
}

func (suite *CommonTestSuite) TearDownSuite() {
Expand All @@ -401,14 +402,14 @@ func (suite *CommonTestSuite) TearDownSuite() {
func (suite *CommonTestSuite) TestAdvertiseAddr() {
re := suite.Require()

conf := suite.tsoPrimary.GetConfig()
conf := suite.tsoDefaultPrimaryServer.GetConfig()
re.Equal(conf.GetListenAddr(), conf.GetAdvertiseListenAddr())
}

func (suite *CommonTestSuite) TestMetrics() {
re := suite.Require()

resp, err := http.Get(suite.tsoPrimary.GetConfig().GetAdvertiseListenAddr() + "/metrics")
resp, err := http.Get(suite.tsoDefaultPrimaryServer.GetConfig().GetAdvertiseListenAddr() + "/metrics")
re.NoError(err)
defer resp.Body.Close()
re.Equal(http.StatusOK, resp.StatusCode)
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (suite *tsoClientTestSuite) TestRandomResignLeader() {
for _, keyspaceID := range keyspaceIDs {
go func(keyspaceID uint32) {
defer wg.Done()
err := suite.tsoCluster.ResignPrimary(keyspaceID, 0)
err := suite.tsoCluster.ResignPrimary(keyspaceID, mcsutils.DefaultKeyspaceGroupID)
re.NoError(err)
suite.tsoCluster.WaitForPrimaryServing(re, keyspaceID, 0)
}(keyspaceID)
Expand Down

0 comments on commit cd4a076

Please sign in to comment.