diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 1436441f2c67..c35723236e83 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -379,8 +379,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient() re.Equal(uint32(1), kg1.ID) re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces) re.False(kg1.IsSplitting()) - // Make sure the leader of the keyspace group 2 is elected. - member, err := suite.tsoCluster.WaitForPrimaryServing(re, 222, 2).GetMember(222, 2) + // Make sure the leader of the keyspace group 1 is elected. + member, err := suite.tsoCluster.WaitForPrimaryServing(re, 222, 1).GetMember(222, 1) re.NoError(err) re.NotNil(member) // Prepare the client for keyspace 222. @@ -630,3 +630,84 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMerge() { }, testutil.WithTickInterval(5*time.Second), testutil.WithWaitFor(time.Minute)) re.Greater(tsoutil.CompareTimestamp(&mergedTS, &ts), 0) } + +func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeClient() { + re := suite.Require() + // Create the keyspace group 1 with keyspaces [111, 222, 333]. + handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ + KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: 1, + UserKind: endpoint.Standard.String(), + Members: suite.tsoCluster.GetKeyspaceGroupMember(), + Keyspaces: []uint32{111, 222, 333}, + }, + }, + }) + kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1) + re.Equal(uint32(1), kg1.ID) + re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces) + re.False(kg1.IsMerging()) + // Make sure the leader of the keyspace group 1 is elected. + member, err := suite.tsoCluster.WaitForPrimaryServing(re, 222, 1).GetMember(222, 1) + re.NoError(err) + re.NotNil(member) + // Prepare the client for keyspace 222. + tsoClient, err := pd.NewClientWithKeyspace(suite.ctx, 222, []string{suite.pdLeaderServer.GetAddr()}, pd.SecurityOption{}) + re.NoError(err) + re.NotNil(tsoClient) + // Request the TSO for keyspace 222 concurrently. + var ( + wg sync.WaitGroup + ctx, cancel = context.WithCancel(suite.ctx) + lastPhysical, lastLogical int64 + ) + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + // Make sure at least one TSO request is successful. + re.NotEmpty(lastPhysical) + return + default: + } + physical, logical, err := tsoClient.GetTS(ctx) + if err != nil { + errMsg := err.Error() + // Ignore the errors caused by the merge and context cancellation. + if strings.Contains(errMsg, "context canceled") || + strings.Contains(errMsg, "not leader") || + strings.Contains(errMsg, "not served") || + strings.Contains(errMsg, "ErrKeyspaceNotAssigned") || + strings.Contains(errMsg, "ErrKeyspaceGroupIsMerging") { + continue + } + re.FailNow(errMsg) + } + if physical == lastPhysical { + re.Greater(logical, lastLogical) + } else { + re.Greater(physical, lastPhysical) + } + lastPhysical, lastLogical = physical, logical + } + }() + // Merge the keyspace group 1 to the default keyspace group. + handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{ + MergeList: []uint32{1}, + }) + // Wait for the default keyspace group to finish the merge. + testutil.Eventually(re, func() bool { + kg := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID) + re.Equal(mcsutils.DefaultKeyspaceGroupID, kg.ID) + for _, keyspaceID := range []uint32{111, 222, 333} { + re.Contains(kg.Keyspaces, keyspaceID) + } + return !kg.IsMergeTarget() + }) + // Stop the client. + cancel() + wg.Wait() +}