diff --git a/errors.toml b/errors.toml index b5d4dafdf0d8..051e1dbe02fe 100644 --- a/errors.toml +++ b/errors.toml @@ -761,6 +761,11 @@ error = ''' the keyspace group id is invalid, %s ''' +["PD:tso:ErrKeyspaceGroupIsMerging"] +error = ''' +the keyspace group %d is merging +''' + ["PD:tso:ErrKeyspaceGroupNotInitialized"] error = ''' the keyspace group %d isn't initialized diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index d98b5e9dfd03..9fb6548d6747 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -55,6 +55,7 @@ var ( ErrKeyspaceGroupNotInitialized = errors.Normalize("the keyspace group %d isn't initialized", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupNotInitialized")) ErrKeyspaceNotAssigned = errors.Normalize("the keyspace %d isn't assigned to any keyspace group", errors.RFCCodeText("PD:tso:ErrKeyspaceNotAssigned")) ErrGetMinTS = errors.Normalize("get min ts failed, %s", errors.RFCCodeText("PD:tso:ErrGetMinTS")) + ErrKeyspaceGroupIsMerging = errors.Normalize("the keyspace group %d is merging", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupIsMerging")) ) // member errors diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index d6a378704d80..6e67ccf69514 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -1406,3 +1406,16 @@ func (am *AllocatorManager) GetLeaderAddr() string { } return leaderAddrs[0] } + +// Construct the timestampOracle path prefix, which is: +// 1. for the default keyspace group: +// "" in /pd/{cluster_id}/timestamp +// 2. for the non-default keyspace groups: +// {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp +func (am *AllocatorManager) getKeyspaceGroupTSPath(groupID uint32) string { + tsPath := "" + if am.kgID != mcsutils.DefaultKeyspaceGroupID { + tsPath = path.Join(fmt.Sprintf("%05d", groupID), globalTSOAllocatorEtcdPrefix) + } + return tsPath +} diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 284d7dc316ab..2c715d0cc7ce 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -18,7 +18,6 @@ import ( "context" "errors" "fmt" - "path" "sync" "sync/atomic" "time" @@ -89,16 +88,6 @@ func NewGlobalTSOAllocator( am *AllocatorManager, startGlobalLeaderLoop bool, ) Allocator { - // Construct the timestampOracle path prefix, which is: - // 1. for the default keyspace group: - // "" in /pd/{cluster_id}/timestamp - // 2. for the non-default keyspace groups: - // {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp - tsPath := "" - if am.kgID != mcsutils.DefaultKeyspaceGroupID { - tsPath = path.Join(fmt.Sprintf("%05d", am.kgID), globalTSOAllocatorEtcdPrefix) - } - ctx, cancel := context.WithCancel(ctx) gta := &GlobalTSOAllocator{ ctx: ctx, @@ -107,8 +96,7 @@ func NewGlobalTSOAllocator( member: am.member, timestampOracle: ×tampOracle{ client: am.member.GetLeadership().GetClient(), - rootPath: am.rootPath, - tsPath: tsPath, + tsPath: am.getKeyspaceGroupTSPath(am.kgID), storage: am.storage, saveInterval: am.saveInterval, updatePhysicalInterval: am.updatePhysicalInterval, diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 1ec3e7ffcc3c..6ed925f971a6 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -34,6 +34,7 @@ import ( "github.com/tikv/pd/pkg/mcs/discovery" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/member" + "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/apiutil" @@ -41,6 +42,7 @@ import ( "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/memberutil" "github.com/tikv/pd/pkg/utils/tsoutil" + "github.com/tikv/pd/pkg/utils/typeutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" @@ -50,6 +52,9 @@ const ( keyspaceGroupsElectionPath = mcsutils.KeyspaceGroupsKey + "/election" // primaryKey is the key for keyspace group primary election. primaryKey = "primary" + // mergingCheckInterval is the interval for merging check to see if the keyspace groups + // merging process could be moved forward. + mergingCheckInterval = 5 * time.Second ) type state struct { @@ -241,6 +246,9 @@ type KeyspaceGroupManager struct { groupWatcher *etcdutil.LoopWatcher primaryPathBuilder *kgPrimaryPathBuilder + + // mergeCheckerCancelMap is the cancel function map for the merge checker of each keyspace group. + mergeCheckerCancelMap sync.Map // GroupID -> context.CancelFunc } // NewKeyspaceGroupManager creates a new Keyspace Group Manager. @@ -384,12 +392,9 @@ func (kgm *KeyspaceGroupManager) Close() { } func (kgm *KeyspaceGroupManager) isAssignedToMe(group *endpoint.KeyspaceGroup) bool { - for _, member := range group.Members { - if member.Address == kgm.tsoServiceID.ServiceAddr { - return true - } - } - return false + return slice.AnyOf(group.Members, func(i int) bool { + return group.Members[i].Address == kgm.tsoServiceID.ServiceAddr + }) } // updateKeyspaceGroup applies the given keyspace group. If the keyspace group is just assigned to @@ -416,9 +421,25 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro return } + oldAM, oldGroup := kgm.getKeyspaceGroupMeta(group.ID) + // If this host owns a replica of the keyspace group which is the merge target, + // it should run the merging checker when the merge state first time changes. + if !oldGroup.IsMergeTarget() && group.IsMergeTarget() { + ctx, cancel := context.WithCancel(kgm.ctx) + kgm.mergeCheckerCancelMap.Store(group.ID, cancel) + kgm.wg.Add(1) + go kgm.mergingChecker(ctx, group.ID, group.MergeState.MergeList) + } + // If the merge state has been finished, cancel its merging checker. + if oldGroup.IsMergeTarget() && !group.IsMergeTarget() { + if cancel, loaded := kgm.mergeCheckerCancelMap.LoadAndDelete(group.ID); loaded && cancel != nil { + cancel.(context.CancelFunc)() + } + } + // If this host is already assigned a replica of this keyspace group, i.e., the election member // is already initialized, just update the meta. - if oldAM, oldGroup := kgm.getKeyspaceGroupMeta(group.ID); oldAM != nil { + if oldAM != nil { kgm.updateKeyspaceGroupMembership(oldGroup, group, true) return } @@ -738,6 +759,10 @@ func (kgm *KeyspaceGroupManager) HandleTSORequest( if err != nil { return pdpb.Timestamp{}, curKeyspaceGroupID, err } + err = kgm.checkTSOMerge(curKeyspaceGroupID) + if err != nil { + return pdpb.Timestamp{}, curKeyspaceGroupID, err + } ts, err = am.HandleRequest(dcLocation, count) return ts, curKeyspaceGroupID, err } @@ -898,3 +923,180 @@ func (kgm *KeyspaceGroupManager) finishSplitKeyspaceGroup(id uint32) error { kgm.kgs[id] = splitGroup return nil } + +func (kgm *KeyspaceGroupManager) finishMergeKeyspaceGroup(id uint32) error { + kgm.Lock() + defer kgm.Unlock() + // Check if the keyspace group is in the merging state. + mergeTarget := kgm.kgs[id] + if !mergeTarget.IsMergeTarget() { + return nil + } + // Check if the HTTP client is initialized. + if kgm.httpClient == nil { + return nil + } + statusCode, err := apiutil.DoDelete( + kgm.httpClient, + kgm.cfg.GeBackendEndpoints()+keyspaceGroupsAPIPrefix+fmt.Sprintf("/%d/merge", id)) + if err != nil { + return err + } + if statusCode != http.StatusOK { + log.Warn("failed to finish merging keyspace group", + zap.Uint32("keyspace-group-id", id), + zap.Int("status-code", statusCode)) + return errs.ErrSendRequest.FastGenByArgs() + } + // Pre-update the split keyspace group split state in memory. + mergeTarget.MergeState = nil + kgm.kgs[id] = mergeTarget + return nil +} + +// mergingChecker is used to check if the keyspace group is in merge state, and if so, it will +// make sure the newly merged TSO keep consistent with the original ones. +func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTargetID uint32, mergeList []uint32) { + log.Info("start to merge the keyspace group", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("merge-target-id", mergeTargetID), + zap.Any("merge-list", mergeList)) + defer logutil.LogPanic() + defer kgm.wg.Done() + + checkTicker := time.NewTicker(mergingCheckInterval) + defer checkTicker.Stop() + // Prepare the merge map. + mergeMap := make(map[uint32]struct{}, len(mergeList)) + for _, id := range mergeList { + mergeMap[id] = struct{}{} + } + + for { + select { + case <-ctx.Done(): + log.Info("merging checker is closed", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("merge-target-id", mergeTargetID), + zap.Any("merge-list", mergeList)) + return + case <-checkTicker.C: + } + // Check if current TSO node is the merge target TSO primary node. + am, err := kgm.GetAllocatorManager(mergeTargetID) + if err != nil { + log.Warn("unable to get the merge target allocator manager", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("keyspace-group-id", mergeTargetID), + zap.Any("merge-list", mergeList), + zap.Error(err)) + continue + } + // If the current TSO node is not the merge target TSO primary node, + // we still need to keep this loop running to avoid unexpected primary changes. + if !am.IsLeader() { + log.Debug("current tso node is not the merge target primary", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("merge-target-id", mergeTargetID), + zap.Any("merge-list", mergeList)) + continue + } + // Check if the keyspace group primaries in the merge map are all gone. + if len(mergeMap) != 0 { + for id := range mergeMap { + leaderPath := path.Join(kgm.primaryPathBuilder.getKeyspaceGroupIDPath(id), primaryKey) + val, err := kgm.tsoSvcStorage.Load(leaderPath) + if err != nil { + log.Error("failed to check if the keyspace group primary in the merge list has gone", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("merge-target-id", mergeTargetID), + zap.Any("merge-list", mergeList), + zap.Uint32("merge-id", id), + zap.Any("remaining", mergeMap), + zap.Error(err)) + continue + } + if len(val) == 0 { + delete(mergeMap, id) + } + } + } + if len(mergeMap) > 0 { + continue + } + // All the keyspace group primaries in the merge list are gone, + // update the newly merged TSO to make sure it is greater than the original ones. + var mergedTS time.Time + for _, id := range mergeList { + ts, err := kgm.tsoSvcStorage.LoadTimestamp(am.getKeyspaceGroupTSPath(id)) + if err != nil || ts == typeutil.ZeroTime { + log.Error("failed to load the keyspace group TSO", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("merge-target-id", mergeTargetID), + zap.Any("merge-list", mergeList), + zap.Uint32("merge-id", id), + zap.Time("ts", ts), + zap.Error(err)) + mergedTS = typeutil.ZeroTime + break + } + if ts.After(mergedTS) { + mergedTS = ts + } + } + if mergedTS == typeutil.ZeroTime { + continue + } + // Update the newly merged TSO. + // TODO: support the Local TSO Allocator. + allocator, err := am.GetAllocator(GlobalDCLocation) + if err != nil { + log.Error("failed to get the allocator", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("merge-target-id", mergeTargetID), + zap.Any("merge-list", mergeList), + zap.Error(err)) + continue + } + err = allocator.SetTSO( + tsoutil.GenerateTS(tsoutil.GenerateTimestamp(mergedTS, 1)), + true, true) + if err != nil { + log.Error("failed to update the newly merged TSO", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("merge-target-id", mergeTargetID), + zap.Any("merge-list", mergeList), + zap.Time("merged-ts", mergedTS), + zap.Error(err)) + continue + } + // Finish the merge. + err = kgm.finishMergeKeyspaceGroup(mergeTargetID) + if err != nil { + log.Error("failed to finish the merge", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("merge-target-id", mergeTargetID), + zap.Any("merge-list", mergeList), + zap.Error(err)) + continue + } + log.Info("finished merging keyspace group", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("merge-target-id", mergeTargetID), + zap.Any("merge-list", mergeList), + zap.Time("merged-ts", mergedTS)) + return + } +} + +// Reject any request if the keyspace group is in merging state, +// we need to wait for the merging checker to finish the TSO merging. +func (kgm *KeyspaceGroupManager) checkTSOMerge( + keyspaceGroupID uint32, +) error { + _, group := kgm.getKeyspaceGroupMeta(keyspaceGroupID) + if !group.IsMerging() { + return nil + } + return errs.ErrKeyspaceGroupIsMerging.FastGenByArgs(keyspaceGroupID) +} diff --git a/pkg/tso/local_allocator.go b/pkg/tso/local_allocator.go index 9c2867966bc2..9995d5cec3f4 100644 --- a/pkg/tso/local_allocator.go +++ b/pkg/tso/local_allocator.go @@ -72,7 +72,6 @@ func NewLocalTSOAllocator( leadership: leadership, timestampOracle: ×tampOracle{ client: leadership.GetClient(), - rootPath: am.rootPath, tsPath: tsPath, storage: am.storage, saveInterval: am.saveInterval, diff --git a/pkg/tso/tso.go b/pkg/tso/tso.go index aa1a424d8cd5..54f0cb927be8 100644 --- a/pkg/tso/tso.go +++ b/pkg/tso/tso.go @@ -60,8 +60,7 @@ type tsoObject struct { // timestampOracle is used to maintain the logic of TSO. type timestampOracle struct { - client *clientv3.Client - rootPath string + client *clientv3.Client // When tsPath is empty, it means that it is a global timestampOracle. tsPath string storage endpoint.TSOStorage diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 22c131be252d..d265d8fc73bb 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" "github.com/tikv/pd/pkg/election" + "github.com/tikv/pd/pkg/errs" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/storage/endpoint" @@ -263,7 +264,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { err error ) testutil.Eventually(re, func() bool { - ts, err = suite.requestTSO(re, 1, 222, 1) + ts, err = suite.requestTSO(re, 222, 1) return err == nil && tsoutil.CompareTimestamp(&ts, &pdpb.Timestamp{}) > 0 }) ts.Physical += time.Hour.Milliseconds() @@ -282,22 +283,22 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { // Check the split TSO from keyspace group 2. var splitTS pdpb.Timestamp testutil.Eventually(re, func() bool { - splitTS, err = suite.requestTSO(re, 1, 222, 2) + splitTS, err = suite.requestTSO(re, 222, 2) return err == nil && tsoutil.CompareTimestamp(&splitTS, &pdpb.Timestamp{}) > 0 }) - splitTS, err = suite.requestTSO(re, 1, 222, 2) + splitTS, err = suite.requestTSO(re, 222, 2) + re.NoError(err) re.Greater(tsoutil.CompareTimestamp(&splitTS, &ts), 0) } func (suite *tsoKeyspaceGroupManagerTestSuite) requestTSO( re *require.Assertions, - count, keyspaceID, keyspaceGroupID uint32, + keyspaceID, keyspaceGroupID uint32, ) (pdpb.Timestamp, error) { primary := suite.tsoCluster.WaitForPrimaryServing(re, keyspaceID, keyspaceGroupID) kgm := primary.GetKeyspaceGroupManager() re.NotNil(kgm) - ts, _, err := kgm.HandleTSORequest(keyspaceID, keyspaceGroupID, tsopkg.GlobalDCLocation, count) - re.NoError(err) + ts, _, err := kgm.HandleTSORequest(keyspaceID, keyspaceGroupID, tsopkg.GlobalDCLocation, 1) return ts, err } @@ -357,36 +358,58 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitElection re.Equal(member1.GetLeaderListenUrls(), member2.GetLeaderListenUrls()) // Finish the split. handlersutil.MustFinishSplitKeyspaceGroup(re, suite.pdLeaderServer, 2) + // Wait for the keyspace groups to finish the split. + waitFinishSplit(re, suite.pdLeaderServer, 1, 2, []uint32{111}, []uint32{222, 333}) +} + +func waitFinishSplit( + re *require.Assertions, + server *tests.TestServer, + splitSourceID, splitTargetID uint32, + splitSourceKeyspaces, splitTargetKeyspaces []uint32, +) { + testutil.Eventually(re, func() bool { + kg := handlersutil.MustLoadKeyspaceGroupByID(re, server, splitTargetID) + re.Equal(splitTargetID, kg.ID) + re.Equal(splitTargetKeyspaces, kg.Keyspaces) + return !kg.IsSplitTarget() + }) + testutil.Eventually(re, func() bool { + kg := handlersutil.MustLoadKeyspaceGroupByID(re, server, splitSourceID) + re.Equal(splitSourceID, kg.ID) + re.Equal(splitSourceKeyspaces, kg.Keyspaces) + return !kg.IsSplitSource() + }) } func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient() { re := suite.Require() // Enable the failpoint to slow down the system time to test whether the TSO is monotonic. re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/systemTimeSlow", `return(true)`)) - // Create the keyspace group 1 with keyspaces [111, 222, 333]. + // Create the keyspace group 1 with keyspaces [444, 555, 666]. handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ KeyspaceGroups: []*endpoint.KeyspaceGroup{ { ID: 1, UserKind: endpoint.Standard.String(), Members: suite.tsoCluster.GetKeyspaceGroupMember(), - Keyspaces: []uint32{111, 222, 333}, + Keyspaces: []uint32{444, 555, 666}, }, }, }) kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1) re.Equal(uint32(1), kg1.ID) - re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces) + re.Equal([]uint32{444, 555, 666}, kg1.Keyspaces) re.False(kg1.IsSplitting()) - // Make sure the leader of the keyspace group 2 is elected. - member, err := suite.tsoCluster.WaitForPrimaryServing(re, 222, 2).GetMember(222, 2) + // Make sure the leader of the keyspace group 1 is elected. + member, err := suite.tsoCluster.WaitForPrimaryServing(re, 555, 1).GetMember(555, 1) re.NoError(err) re.NotNil(member) - // Prepare the client for keyspace 222. - tsoClient, err := pd.NewClientWithKeyspace(suite.ctx, 222, []string{suite.pdLeaderServer.GetAddr()}, pd.SecurityOption{}) + // Prepare the client for keyspace 555. + tsoClient, err := pd.NewClientWithKeyspace(suite.ctx, 555, []string{suite.pdLeaderServer.GetAddr()}, pd.SecurityOption{}) re.NoError(err) re.NotNil(tsoClient) - // Request the TSO for keyspace 222 concurrently. + // Request the TSO for keyspace 555 concurrently. var ( wg sync.WaitGroup ctx, cancel = context.WithCancel(suite.ctx) @@ -426,15 +449,10 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient() // Split the keyspace group 1 to 2. handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{ NewID: 2, - Keyspaces: []uint32{222, 333}, - }) - // Wait for the keyspace group 2 to finish the split. - testutil.Eventually(re, func() bool { - kg2 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 2) - re.Equal(uint32(2), kg2.ID) - re.Equal([]uint32{222, 333}, kg2.Keyspaces) - return !kg2.IsSplitTarget() + Keyspaces: []uint32{555, 666}, }) + // Wait for the keyspace groups to finish the split. + waitFinishSplit(re, suite.pdLeaderServer, 1, 2, []uint32{444}, []uint32{555, 666}) // Stop the client. cancel() wg.Wait() @@ -540,10 +558,9 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { // Trigger checkTSOSplit to ensure the split is finished. testutil.Eventually(re, func() bool { _, _, err = clientB.(pd.Client).GetTS(ctx) - re.NoError(err) - kg := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 0) - return !kg.IsSplitting() + return err == nil }) + waitFinishSplit(re, leaderServer, 0, 1, []uint32{mcsutils.DefaultKeyspaceID, 1}, []uint32{2}) clientB.(pd.Client).Close() // Then split keyspace group 0 to 2 with keyspace 1. @@ -555,10 +572,9 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { // Trigger checkTSOSplit to ensure the split is finished. testutil.Eventually(re, func() bool { _, _, err = clientA.(pd.Client).GetTS(ctx) - re.NoError(err) - kg := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 0) - return !kg.IsSplitting() + return err == nil }) + waitFinishSplit(re, leaderServer, 0, 2, []uint32{mcsutils.DefaultKeyspaceID}, []uint32{1}) clientA.(pd.Client).Close() // Check the keyspace group 0 is split to 1 and 2. @@ -574,3 +590,139 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) } + +func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMerge() { + re := suite.Require() + // Create the keyspace group 1 and 2 with keyspaces [111, 222] and [333]. + handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ + KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: 1, + UserKind: endpoint.Standard.String(), + Members: suite.tsoCluster.GetKeyspaceGroupMember(), + Keyspaces: []uint32{111, 222}, + }, + { + ID: 2, + UserKind: endpoint.Standard.String(), + Members: suite.tsoCluster.GetKeyspaceGroupMember(), + Keyspaces: []uint32{333}, + }, + }, + }) + // Get a TSO from the keyspace group 1. + var ( + ts pdpb.Timestamp + err error + ) + testutil.Eventually(re, func() bool { + ts, err = suite.requestTSO(re, 222, 1) + return err == nil && tsoutil.CompareTimestamp(&ts, &pdpb.Timestamp{}) > 0 + }) + ts.Physical += time.Hour.Milliseconds() + // Set the TSO of the keyspace group 1 to a large value. + err = suite.tsoCluster.GetPrimaryServer(222, 1).GetHandler().ResetTS(tsoutil.GenerateTS(&ts), false, true, 1) + re.NoError(err) + // Merge the keyspace group 1 and 2 to the default keyspace group. + handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{ + MergeList: []uint32{1, 2}, + }) + // Check the keyspace group 1 and 2 are merged to the default keyspace group. + kg := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID) + re.Equal(mcsutils.DefaultKeyspaceGroupID, kg.ID) + for _, keyspaceID := range []uint32{111, 222, 333} { + re.Contains(kg.Keyspaces, keyspaceID) + } + re.True(kg.IsMergeTarget()) + // Check the merged TSO from the default keyspace group is greater than the TSO from the keyspace group 1. + var mergedTS pdpb.Timestamp + testutil.Eventually(re, func() bool { + mergedTS, err = suite.requestTSO(re, 333, mcsutils.DefaultKeyspaceGroupID) + if err != nil { + re.ErrorIs(err, errs.ErrKeyspaceGroupIsMerging) + } + return err == nil && tsoutil.CompareTimestamp(&mergedTS, &pdpb.Timestamp{}) > 0 + }, testutil.WithTickInterval(5*time.Second), testutil.WithWaitFor(time.Minute)) + re.Greater(tsoutil.CompareTimestamp(&mergedTS, &ts), 0) +} + +func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeClient() { + re := suite.Require() + // Create the keyspace group 1 with keyspaces [111, 222, 333]. + handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ + KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: 1, + UserKind: endpoint.Standard.String(), + Members: suite.tsoCluster.GetKeyspaceGroupMember(), + Keyspaces: []uint32{111, 222, 333}, + }, + }, + }) + kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1) + re.Equal(uint32(1), kg1.ID) + re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces) + re.False(kg1.IsMerging()) + // Make sure the leader of the keyspace group 1 is elected. + member, err := suite.tsoCluster.WaitForPrimaryServing(re, 222, 1).GetMember(222, 1) + re.NoError(err) + re.NotNil(member) + // Prepare the client for keyspace 222. + tsoClient, err := pd.NewClientWithKeyspace(suite.ctx, 222, []string{suite.pdLeaderServer.GetAddr()}, pd.SecurityOption{}) + re.NoError(err) + re.NotNil(tsoClient) + // Request the TSO for keyspace 222 concurrently. + var ( + wg sync.WaitGroup + ctx, cancel = context.WithCancel(suite.ctx) + lastPhysical, lastLogical int64 + ) + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + // Make sure at least one TSO request is successful. + re.NotEmpty(lastPhysical) + return + default: + } + physical, logical, err := tsoClient.GetTS(ctx) + if err != nil { + errMsg := err.Error() + // Ignore the errors caused by the merge and context cancellation. + if strings.Contains(errMsg, "context canceled") || + strings.Contains(errMsg, "not leader") || + strings.Contains(errMsg, "not served") || + strings.Contains(errMsg, "ErrKeyspaceNotAssigned") || + strings.Contains(errMsg, "ErrKeyspaceGroupIsMerging") { + continue + } + re.FailNow(errMsg) + } + if physical == lastPhysical { + re.Greater(logical, lastLogical) + } else { + re.Greater(physical, lastPhysical) + } + lastPhysical, lastLogical = physical, logical + } + }() + // Merge the keyspace group 1 to the default keyspace group. + handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{ + MergeList: []uint32{1}, + }) + // Wait for the default keyspace group to finish the merge. + testutil.Eventually(re, func() bool { + kg := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID) + re.Equal(mcsutils.DefaultKeyspaceGroupID, kg.ID) + for _, keyspaceID := range []uint32{111, 222, 333} { + re.Contains(kg.Keyspaces, keyspaceID) + } + return !kg.IsMergeTarget() + }) + // Stop the client. + cancel() + wg.Wait() +} diff --git a/tests/server/apiv2/handlers/testutil.go b/tests/server/apiv2/handlers/testutil.go index b638f1bbba4e..900cd84b8293 100644 --- a/tests/server/apiv2/handlers/testutil.go +++ b/tests/server/apiv2/handlers/testutil.go @@ -205,7 +205,7 @@ func MustDeleteKeyspaceGroup(re *require.Assertions, server *tests.TestServer, i re.Equal(http.StatusOK, resp.StatusCode, string(data)) } -// MustSplitKeyspaceGroup updates a keyspace group with HTTP API. +// MustSplitKeyspaceGroup splits a keyspace group with HTTP API. func MustSplitKeyspaceGroup(re *require.Assertions, server *tests.TestServer, id uint32, request *handlers.SplitKeyspaceGroupByIDParams) { data, err := json.Marshal(request) re.NoError(err) @@ -232,3 +232,18 @@ func MustFinishSplitKeyspaceGroup(re *require.Assertions, server *tests.TestServ re.NoError(err) re.Equal(http.StatusOK, resp.StatusCode, string(data)) } + +// MustMergeKeyspaceGroup merges keyspace groups with HTTP API. +func MustMergeKeyspaceGroup(re *require.Assertions, server *tests.TestServer, id uint32, request *handlers.MergeKeyspaceGroupsParams) { + data, err := json.Marshal(request) + re.NoError(err) + httpReq, err := http.NewRequest(http.MethodPost, server.GetAddr()+keyspaceGroupsPrefix+fmt.Sprintf("/%d/merge", id), bytes.NewBuffer(data)) + re.NoError(err) + // Send request. + resp, err := dialClient.Do(httpReq) + re.NoError(err) + defer resp.Body.Close() + data, err = io.ReadAll(resp.Body) + re.NoError(err) + re.Equal(http.StatusOK, resp.StatusCode, string(data)) +}