From 40cd55563be923df91c48ccd7b89e02ea1c4b3c2 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Tue, 13 Jun 2023 17:06:06 +0800 Subject: [PATCH 1/3] Implement the keyspace group merging API Signed-off-by: JmPotato --- pkg/keyspace/keyspace.go | 3 + pkg/keyspace/tso_keyspace_group.go | 125 +++++++++++++++++++- pkg/keyspace/tso_keyspace_group_test.go | 70 ++++++++++- pkg/keyspace/util.go | 4 + pkg/storage/endpoint/tso_keyspace_group.go | 24 ++++ server/apiv2/handlers/tso_keyspace_group.go | 64 ++++++++++ 6 files changed, 284 insertions(+), 6 deletions(-) diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index deb6e4fc625..c257c144898 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -688,6 +688,9 @@ func (manager *Manager) PatrolKeyspaceAssignment() error { if defaultKeyspaceGroup.IsSplitting() { return ErrKeyspaceGroupInSplit } + if defaultKeyspaceGroup.IsMerging() { + return ErrKeyspaceGroupInMerging + } keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, keyspacePatrolBatchSize) if err != nil { return err diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index efe905b6439..fe04677194a 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -340,6 +340,9 @@ func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGro if oldKG.IsSplitting() && overwrite { return ErrKeyspaceGroupInSplit } + if oldKG.IsMerging() && overwrite { + return ErrKeyspaceGroupInMerging + } newKG := &endpoint.KeyspaceGroup{ ID: keyspaceGroup.ID, UserKind: keyspaceGroup.UserKind, @@ -415,6 +418,9 @@ func (m *GroupManager) updateKeyspaceForGroupLocked(userKind endpoint.UserKind, if kg.IsSplitting() { return ErrKeyspaceGroupInSplit } + if kg.IsMerging() { + return ErrKeyspaceGroupInMerging + } changed := false @@ -469,6 +475,9 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse if oldKG.IsSplitting() || newKG.IsSplitting() { return ErrKeyspaceGroupInSplit } + if oldKG.IsMerging() || newKG.IsMerging() { + return ErrKeyspaceGroupInMerging + } var updateOld, updateNew bool if !slice.Contains(newKG.Keyspaces, keyspaceID) { @@ -516,6 +525,10 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint3 if splitSourceKg.IsSplitting() { return ErrKeyspaceGroupInSplit } + // A keyspace group can not be split when it is in merging. + if splitSourceKg.IsMerging() { + return ErrKeyspaceGroupInMerging + } // Check if the source keyspace group has enough replicas. if len(splitSourceKg.Members) < utils.KeyspaceGroupDefaultReplicaCount { return ErrKeyspaceGroupNotEnoughReplicas @@ -618,11 +631,7 @@ func (m *GroupManager) FinishSplitKeyspaceByID(splitTargetID uint32) error { if err != nil { return err } - err = m.store.SaveKeyspaceGroup(txn, splitSourceKg) - if err != nil { - return err - } - return nil + return m.store.SaveKeyspaceGroup(txn, splitSourceKg) }); err != nil { return err } @@ -663,6 +672,9 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount if kg.IsSplitting() { return ErrKeyspaceGroupInSplit } + if kg.IsMerging() { + return ErrKeyspaceGroupInMerging + } exists := make(map[string]struct{}) for _, member := range kg.Members { exists[member.Address] = struct{}{} @@ -719,6 +731,9 @@ func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error if kg.IsSplitting() { return ErrKeyspaceGroupInSplit } + if kg.IsMerging() { + return ErrKeyspaceGroupInMerging + } members := make([]endpoint.KeyspaceGroupMember, 0, len(nodes)) for _, node := range nodes { members = append(members, endpoint.KeyspaceGroupMember{Address: node}) @@ -743,3 +758,103 @@ func (m *GroupManager) IsExistNode(addr string) bool { } return false } + +// MergeKeyspaceGroups merges the keyspace group in the list into the target keyspace group. +func (m *GroupManager) MergeKeyspaceGroups(mergeTargetID uint32, mergeList []uint32) error { + mergeListNum := len(mergeList) + if mergeListNum == 0 { + return nil + } + var ( + groups = make(map[uint32]*endpoint.KeyspaceGroup, mergeListNum+1) + mergeTargetKg *endpoint.KeyspaceGroup + ) + m.Lock() + defer m.Unlock() + if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { + // Load and check all keyspace groups first. + for _, kgID := range append(mergeList, mergeTargetID) { + kg, err := m.store.LoadKeyspaceGroup(txn, kgID) + if err != nil { + return err + } + if kg == nil { + return ErrKeyspaceGroupNotExists + } + // A keyspace group can not be merged if it's in splitting. + if kg.IsSplitting() { + return ErrKeyspaceGroupInSplit + } + // A keyspace group can not be split when it is in merging. + if kg.IsMerging() { + return ErrKeyspaceGroupInMerging + } + groups[kgID] = kg + } + mergeTargetKg = groups[mergeTargetID] + // Delete the keyspace groups in merge list and move the keyspaces in it to the target keyspace group. + for _, kgID := range mergeList { + kg := groups[kgID] + for _, keyspace := range kg.Keyspaces { + if !slice.Contains(mergeTargetKg.Keyspaces, keyspace) { + mergeTargetKg.Keyspaces = append(mergeTargetKg.Keyspaces, keyspace) + } + } + if err := m.store.DeleteKeyspaceGroup(txn, kg.ID); err != nil { + return err + } + } + // Update the merge state of the target keyspace group. + mergeTargetKg.MergeState = &endpoint.MergeState{ + MergeList: mergeList, + } + return m.store.SaveKeyspaceGroup(txn, mergeTargetKg) + }); err != nil { + return err + } + // Update the keyspace group cache. + m.groups[endpoint.StringUserKind(mergeTargetKg.UserKind)].Put(mergeTargetKg) + for _, kgID := range mergeList { + kg := groups[kgID] + m.groups[endpoint.StringUserKind(kg.UserKind)].Remove(kgID) + } + return nil +} + +// FinishMergeKeyspaceByID finishes the merging keyspace group by the merge target ID. +func (m *GroupManager) FinishMergeKeyspaceByID(mergeTargetID uint32) error { + var mergeTargetKg *endpoint.KeyspaceGroup + m.Lock() + defer m.Unlock() + if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { + // Load the merge target keyspace group first. + mergeTargetKg, err = m.store.LoadKeyspaceGroup(txn, mergeTargetID) + if err != nil { + return err + } + if mergeTargetKg == nil { + return ErrKeyspaceGroupNotExists + } + // Check if it's in the merging state. + if !mergeTargetKg.IsMergeTarget() { + return ErrKeyspaceGroupNotInMerging + } + // Make sure all merging keyspace groups are deleted. + for _, kgID := range mergeTargetKg.MergeState.MergeList { + kg, err := m.store.LoadKeyspaceGroup(txn, kgID) + if err != nil { + return err + } + if kg != nil { + return ErrKeyspaceGroupNotInMerging + } + } + mergeTargetKg.MergeState = nil + return m.store.SaveKeyspaceGroup(txn, mergeTargetKg) + }); err != nil { + return err + } + // Update the keyspace group cache. + m.groups[endpoint.StringUserKind(mergeTargetKg.UserKind)].Put(mergeTargetKg) + return nil +} diff --git a/pkg/keyspace/tso_keyspace_group_test.go b/pkg/keyspace/tso_keyspace_group_test.go index df1aa49ee37..6d088770fb5 100644 --- a/pkg/keyspace/tso_keyspace_group_test.go +++ b/pkg/keyspace/tso_keyspace_group_test.go @@ -84,11 +84,12 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupOperations() { re.NoError(err) re.Len(kgs, 2) // get the default keyspace group - kg, err := suite.kgm.GetKeyspaceGroupByID(0) + kg, err := suite.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) re.NoError(err) re.Equal(uint32(0), kg.ID) re.Equal(endpoint.Basic.String(), kg.UserKind) re.False(kg.IsSplitting()) + // get the keyspace group 3 kg, err = suite.kgm.GetKeyspaceGroupByID(3) re.NoError(err) re.Equal(uint32(3), kg.ID) @@ -320,3 +321,70 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() { err = suite.kgm.SplitKeyspaceGroupByID(2, 5, []uint32{111, 222, 444}) re.ErrorIs(err, ErrKeyspaceNotInKeyspaceGroup) } + +func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupMerge() { + re := suite.Require() + + keyspaceGroups := []*endpoint.KeyspaceGroup{ + { + ID: uint32(1), + UserKind: endpoint.Basic.String(), + Keyspaces: []uint32{111, 222, 333}, + Members: make([]endpoint.KeyspaceGroupMember, utils.KeyspaceGroupDefaultReplicaCount), + }, + { + ID: uint32(3), + UserKind: endpoint.Basic.String(), + Keyspaces: []uint32{444, 555}, + }, + } + err := suite.kgm.CreateKeyspaceGroups(keyspaceGroups) + re.NoError(err) + // split the keyspace group 1 to 2 + err = suite.kgm.SplitKeyspaceGroupByID(1, 2, []uint32{333}) + re.NoError(err) + // finish the split of the keyspace group 2 + err = suite.kgm.FinishSplitKeyspaceByID(2) + re.NoError(err) + // check the keyspace group 1 and 2 + kg1, err := suite.kgm.GetKeyspaceGroupByID(1) + re.NoError(err) + re.Equal(uint32(1), kg1.ID) + re.Equal([]uint32{111, 222}, kg1.Keyspaces) + re.False(kg1.IsSplitting()) + re.False(kg1.IsMerging()) + kg2, err := suite.kgm.GetKeyspaceGroupByID(2) + re.NoError(err) + re.Equal(uint32(2), kg2.ID) + re.Equal([]uint32{333}, kg2.Keyspaces) + re.False(kg2.IsSplitting()) + re.False(kg2.IsMerging()) + re.Equal(kg1.UserKind, kg2.UserKind) + re.Equal(kg1.Members, kg2.Members) + // merge the keyspace group 2 and 3 back into 1 + err = suite.kgm.MergeKeyspaceGroups(1, []uint32{2, 3}) + re.NoError(err) + // check the keyspace group 2 and 3 + kg2, err = suite.kgm.GetKeyspaceGroupByID(2) + re.NoError(err) + re.Nil(kg2) + kg3, err := suite.kgm.GetKeyspaceGroupByID(3) + re.NoError(err) + re.Nil(kg3) + // check the keyspace group 1 + kg1, err = suite.kgm.GetKeyspaceGroupByID(1) + re.NoError(err) + re.Equal(uint32(1), kg1.ID) + re.Equal([]uint32{111, 222, 333, 444, 555}, kg1.Keyspaces) + re.False(kg1.IsSplitting()) + re.True(kg1.IsMerging()) + // finish the merging + err = suite.kgm.FinishMergeKeyspaceByID(1) + re.NoError(err) + kg1, err = suite.kgm.GetKeyspaceGroupByID(1) + re.NoError(err) + re.Equal(uint32(1), kg1.ID) + re.Equal([]uint32{111, 222, 333, 444, 555}, kg1.Keyspaces) + re.False(kg1.IsSplitting()) + re.False(kg1.IsMerging()) +} diff --git a/pkg/keyspace/util.go b/pkg/keyspace/util.go index ac27f3aafd4..84107fe617f 100644 --- a/pkg/keyspace/util.go +++ b/pkg/keyspace/util.go @@ -54,6 +54,10 @@ var ( ErrKeyspaceGroupInSplit = errors.New("keyspace group is in split state") // ErrKeyspaceGroupNotInSplit is used to indicate target keyspace group is not in split state. ErrKeyspaceGroupNotInSplit = errors.New("keyspace group is not in split state") + // ErrKeyspaceGroupInMerging is used to indicate target keyspace group is in merging state. + ErrKeyspaceGroupInMerging = errors.New("keyspace group is in merging state") + // ErrKeyspaceGroupNotInMerging is used to indicate target keyspace group is not in merging state. + ErrKeyspaceGroupNotInMerging = errors.New("keyspace group is not in merging state") // ErrKeyspaceNotInKeyspaceGroup is used to indicate target keyspace is not in this keyspace group. ErrKeyspaceNotInKeyspaceGroup = errors.New("keyspace is not in this keyspace group") // ErrKeyspaceGroupNotEnoughReplicas is used to indicate not enough replicas in the keyspace group. diff --git a/pkg/storage/endpoint/tso_keyspace_group.go b/pkg/storage/endpoint/tso_keyspace_group.go index f63922ea64b..5b0e45a547c 100644 --- a/pkg/storage/endpoint/tso_keyspace_group.go +++ b/pkg/storage/endpoint/tso_keyspace_group.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" + "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/kv" "go.etcd.io/etcd/clientv3" ) @@ -83,12 +84,20 @@ type SplitState struct { SplitSource uint32 `json:"split-source"` } +// MergeState defines the merging state of a keyspace group. +type MergeState struct { + // MergeList is the list of keyspace group IDs which are merging to this target keyspace group. + MergeList []uint32 `json:"merge-list"` +} + // KeyspaceGroup is the keyspace group. type KeyspaceGroup struct { ID uint32 `json:"id"` UserKind string `json:"user-kind"` // SplitState is the current split state of the keyspace group. SplitState *SplitState `json:"split-state,omitempty"` + // MergeState is the current merging state of the keyspace group. + MergeState *MergeState `json:"merge-state,omitempty"` // Members are the election members which campaign for the primary of the keyspace group. Members []KeyspaceGroupMember `json:"members"` // Keyspaces are the keyspace IDs which belong to the keyspace group. @@ -122,6 +131,21 @@ func (kg *KeyspaceGroup) SplitSource() uint32 { return 0 } +// IsMerging checks if the keyspace group is in merging state. +func (kg *KeyspaceGroup) IsMerging() bool { + return kg != nil && kg.MergeState != nil +} + +// IsMergeTarget checks if the keyspace group is in merging state and is the merge target. +func (kg *KeyspaceGroup) IsMergeTarget() bool { + return kg.IsMerging() && !slice.Contains(kg.MergeState.MergeList, kg.ID) +} + +// IsMergeSource checks if the keyspace group is in merging state and is the merge source. +func (kg *KeyspaceGroup) IsMergeSource() bool { + return kg.IsMerging() && slice.Contains(kg.MergeState.MergeList, kg.ID) +} + // KeyspaceGroupStorage is the interface for keyspace group storage. type KeyspaceGroupStorage interface { LoadKeyspaceGroups(startID uint32, limit int) ([]*KeyspaceGroup, error) diff --git a/server/apiv2/handlers/tso_keyspace_group.go b/server/apiv2/handlers/tso_keyspace_group.go index 02a4c49fc86..0dca56709b5 100644 --- a/server/apiv2/handlers/tso_keyspace_group.go +++ b/server/apiv2/handlers/tso_keyspace_group.go @@ -42,6 +42,8 @@ func RegisterTSOKeyspaceGroup(r *gin.RouterGroup) { router.POST("/:id/nodes", SetNodesForKeyspaceGroup) router.POST("/:id/split", SplitKeyspaceGroupByID) router.DELETE("/:id/split", FinishSplitKeyspaceByID) + router.POST("/:id/merge", MergeKeyspaceGroups) + router.DELETE("/:id/merge", FinishMergeKeyspaceByID) } // CreateKeyspaceGroupParams defines the params for creating keyspace groups. @@ -237,6 +239,68 @@ func FinishSplitKeyspaceByID(c *gin.Context) { c.JSON(http.StatusOK, nil) } +// MergeKeyspaceGroupsParams defines the params for merging the keyspace groups. +type MergeKeyspaceGroupsParams struct { + MergeList []uint32 `json:"merge-list"` +} + +// MergeKeyspaceGroups merges the keyspace groups in the merge list into the target keyspace group. +func MergeKeyspaceGroups(c *gin.Context) { + id, err := validateKeyspaceGroupID(c) + if err != nil { + c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id") + return + } + mergeParams := &MergeKeyspaceGroupsParams{} + err = c.BindJSON(mergeParams) + if err != nil { + c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause()) + return + } + if len(mergeParams.MergeList) == 0 { + c.AbortWithStatusJSON(http.StatusBadRequest, "invalid empty merge list") + return + } + for _, mergeID := range mergeParams.MergeList { + if !isValid(mergeID) { + c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id") + return + } + } + + svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) + // Split keyspace group. + groupManager := svr.GetKeyspaceGroupManager() + if groupManager == nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) + return + } + err = groupManager.MergeKeyspaceGroups(id, mergeParams.MergeList) + if err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) + return + } + c.JSON(http.StatusOK, nil) +} + +// FinishMergeKeyspaceByID finishes merging keyspace group by ID. +func FinishMergeKeyspaceByID(c *gin.Context) { + id, err := validateKeyspaceGroupID(c) + if err != nil { + c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id") + return + } + + svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) + manager := svr.GetKeyspaceGroupManager() + err = manager.FinishMergeKeyspaceByID(id) + if err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) + return + } + c.JSON(http.StatusOK, nil) +} + // AllocNodesForKeyspaceGroupParams defines the params for allocating nodes for keyspace groups. type AllocNodesForKeyspaceGroupParams struct { Replica int `json:"replica"` From 9a432291f064dcc884b7af7a51b3adc00f94d023 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 14 Jun 2023 10:27:56 +0800 Subject: [PATCH 2/3] Address the comments Signed-off-by: JmPotato --- pkg/keyspace/keyspace.go | 18 +++++++++--------- pkg/keyspace/keyspace_test.go | 6 +++--- pkg/keyspace/tso_keyspace_group.go | 20 +++++++++++++++++--- pkg/keyspace/util.go | 8 +++++--- 4 files changed, 34 insertions(+), 18 deletions(-) diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index c257c144898..f5c4fd1aea8 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -49,10 +49,10 @@ const ( UserKindKey = "user_kind" // TSOKeyspaceGroupIDKey is the key for tso keyspace group id in keyspace config. TSOKeyspaceGroupIDKey = "tso_keyspace_group_id" - // keyspacePatrolBatchSize is the batch size for keyspace assignment patrol. - // the limit of etcd txn op is 128, keyspacePatrolBatchSize need to be less than it. + // maxEtcdTxnOps is the batch size for operating etcd. The limit of etcd txn op is 128. + // We use 120 here to leave some space for other operations. // See: https://github.com/etcd-io/etcd/blob/d3e43d4de6f6d9575b489dd7850a85e37e0f6b6c/server/embed/config.go#L61 - keyspacePatrolBatchSize = 120 + maxEtcdTxnOps = 120 ) // Config is the interface for keyspace config. @@ -669,7 +669,7 @@ func (manager *Manager) PatrolKeyspaceAssignment() error { zap.Duration("cost", time.Since(start)), zap.Uint64("patrolled-keyspace-count", patrolledKeyspaceCount), zap.Uint64("assigned-keyspace-count", assignedKeyspaceCount), - zap.Int("batch-size", keyspacePatrolBatchSize), + zap.Int("batch-size", maxEtcdTxnOps), zap.Uint32("current-start-id", currentStartID), zap.Uint32("next-start-id", nextStartID), ) @@ -691,7 +691,7 @@ func (manager *Manager) PatrolKeyspaceAssignment() error { if defaultKeyspaceGroup.IsMerging() { return ErrKeyspaceGroupInMerging } - keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, keyspacePatrolBatchSize) + keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, maxEtcdTxnOps) if err != nil { return err } @@ -701,9 +701,9 @@ func (manager *Manager) PatrolKeyspaceAssignment() error { currentStartID = keyspaces[0].GetId() nextStartID = keyspaces[keyspaceNum-1].GetId() + 1 } - // If there are less than `keyspacePatrolBatchSize` keyspaces, + // If there are less than `maxEtcdTxnOps` keyspaces, // we have reached the end of the keyspace list. - moreToPatrol = keyspaceNum == keyspacePatrolBatchSize + moreToPatrol = keyspaceNum == maxEtcdTxnOps var ( assigned = false keyspaceIDsToUnlock = make([]uint32, 0, keyspaceNum) @@ -738,7 +738,7 @@ func (manager *Manager) PatrolKeyspaceAssignment() error { err = manager.store.SaveKeyspaceMeta(txn, ks) if err != nil { log.Error("[keyspace] failed to save keyspace meta during patrol", - zap.Int("batch-size", keyspacePatrolBatchSize), + zap.Int("batch-size", maxEtcdTxnOps), zap.Uint32("current-start-id", currentStartID), zap.Uint32("next-start-id", nextStartID), zap.Uint32("keyspace-id", ks.Id), zap.Error(err)) @@ -750,7 +750,7 @@ func (manager *Manager) PatrolKeyspaceAssignment() error { err = manager.kgm.store.SaveKeyspaceGroup(txn, defaultKeyspaceGroup) if err != nil { log.Error("[keyspace] failed to save default keyspace group meta during patrol", - zap.Int("batch-size", keyspacePatrolBatchSize), + zap.Int("batch-size", maxEtcdTxnOps), zap.Uint32("current-start-id", currentStartID), zap.Uint32("next-start-id", nextStartID), zap.Error(err)) return err diff --git a/pkg/keyspace/keyspace_test.go b/pkg/keyspace/keyspace_test.go index dadc2a2509f..45c4bc90be2 100644 --- a/pkg/keyspace/keyspace_test.go +++ b/pkg/keyspace/keyspace_test.go @@ -405,7 +405,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignment() { func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() { re := suite.Require() // Create some keyspaces without any keyspace group. - for i := 1; i < keyspacePatrolBatchSize*2+1; i++ { + for i := 1; i < maxEtcdTxnOps*2+1; i++ { now := time.Now().Unix() err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{ Id: uint32(i), @@ -420,7 +420,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() { defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) re.NoError(err) re.NotNil(defaultKeyspaceGroup) - for i := 1; i < keyspacePatrolBatchSize*2+1; i++ { + for i := 1; i < maxEtcdTxnOps*2+1; i++ { re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i)) } // Patrol the keyspace assignment. @@ -430,7 +430,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() { defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) re.NoError(err) re.NotNil(defaultKeyspaceGroup) - for i := 1; i < keyspacePatrolBatchSize*2+1; i++ { + for i := 1; i < maxEtcdTxnOps*2+1; i++ { re.Contains(defaultKeyspaceGroup.Keyspaces, uint32(i)) } } diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index fe04677194a..6d83d9eedda 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -17,6 +17,7 @@ package keyspace import ( "context" "encoding/json" + "sort" "strconv" "strings" "sync" @@ -765,6 +766,9 @@ func (m *GroupManager) MergeKeyspaceGroups(mergeTargetID uint32, mergeList []uin if mergeListNum == 0 { return nil } + if mergeListNum > maxEtcdTxnOps { + return ErrExceedMaxEtcdTxnOps + } var ( groups = make(map[uint32]*endpoint.KeyspaceGroup, mergeListNum+1) mergeTargetKg *endpoint.KeyspaceGroup @@ -792,18 +796,28 @@ func (m *GroupManager) MergeKeyspaceGroups(mergeTargetID uint32, mergeList []uin groups[kgID] = kg } mergeTargetKg = groups[mergeTargetID] + keyspaces := make(map[uint32]struct{}, len(mergeTargetKg.Keyspaces)) + for _, keyspace := range mergeTargetKg.Keyspaces { + keyspaces[keyspace] = struct{}{} + } // Delete the keyspace groups in merge list and move the keyspaces in it to the target keyspace group. for _, kgID := range mergeList { kg := groups[kgID] for _, keyspace := range kg.Keyspaces { - if !slice.Contains(mergeTargetKg.Keyspaces, keyspace) { - mergeTargetKg.Keyspaces = append(mergeTargetKg.Keyspaces, keyspace) - } + keyspaces[keyspace] = struct{}{} } if err := m.store.DeleteKeyspaceGroup(txn, kg.ID); err != nil { return err } } + mergedKeyspaces := make([]uint32, 0, len(keyspaces)) + for keyspace := range keyspaces { + mergedKeyspaces = append(mergedKeyspaces, keyspace) + } + sort.Slice(mergedKeyspaces, func(i, j int) bool { + return mergedKeyspaces[i] < mergedKeyspaces[j] + }) + mergeTargetKg.Keyspaces = mergedKeyspaces // Update the merge state of the target keyspace group. mergeTargetKg.MergeState = &endpoint.MergeState{ MergeList: mergeList, diff --git a/pkg/keyspace/util.go b/pkg/keyspace/util.go index 84107fe617f..041e8fc0494 100644 --- a/pkg/keyspace/util.go +++ b/pkg/keyspace/util.go @@ -63,9 +63,11 @@ var ( // ErrKeyspaceGroupNotEnoughReplicas is used to indicate not enough replicas in the keyspace group. ErrKeyspaceGroupNotEnoughReplicas = errors.New("not enough replicas in the keyspace group") // ErrNoAvailableNode is used to indicate no available node in the keyspace group. - ErrNoAvailableNode = errors.New("no available node") - errModifyDefault = errors.New("cannot modify default keyspace's state") - errIllegalOperation = errors.New("unknown operation") + ErrNoAvailableNode = errors.New("no available node") + // ErrExceedMaxEtcdTxnOps is used to indicate the number of etcd txn operations exceeds the limit. + ErrExceedMaxEtcdTxnOps = errors.New("exceed max etcd txn operations") + errModifyDefault = errors.New("cannot modify default keyspace's state") + errIllegalOperation = errors.New("unknown operation") // stateTransitionTable lists all allowed next state for the given current state. // Note that transit from any state to itself is allowed for idempotence. From 90ae755628a525aa1d4a4dc3a70a11231c6ef996 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 14 Jun 2023 11:59:19 +0800 Subject: [PATCH 3/3] Address the comments Signed-off-by: JmPotato --- pkg/keyspace/keyspace.go | 2 +- pkg/keyspace/tso_keyspace_group.go | 8 ++++++-- pkg/keyspace/tso_keyspace_group_test.go | 7 +++++++ server/apiv2/handlers/tso_keyspace_group.go | 4 ++-- 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index f5c4fd1aea8..dad4e350fe9 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -49,7 +49,7 @@ const ( UserKindKey = "user_kind" // TSOKeyspaceGroupIDKey is the key for tso keyspace group id in keyspace config. TSOKeyspaceGroupIDKey = "tso_keyspace_group_id" - // maxEtcdTxnOps is the batch size for operating etcd. The limit of etcd txn op is 128. + // maxEtcdTxnOps is the max value of operations in an etcd txn. The default limit of etcd txn op is 128. // We use 120 here to leave some space for other operations. // See: https://github.com/etcd-io/etcd/blob/d3e43d4de6f6d9575b489dd7850a85e37e0f6b6c/server/embed/config.go#L61 maxEtcdTxnOps = 120 diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index 6d83d9eedda..c58dac46128 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -766,7 +766,11 @@ func (m *GroupManager) MergeKeyspaceGroups(mergeTargetID uint32, mergeList []uin if mergeListNum == 0 { return nil } - if mergeListNum > maxEtcdTxnOps { + // The transaction below will: + // - Load and delete the keyspace groups in the merge list. + // - Load and update the target keyspace group. + // So we pre-check the number of operations to avoid exceeding the maximum number of etcd transaction. + if (mergeListNum+1)*2 > maxEtcdTxnOps { return ErrExceedMaxEtcdTxnOps } var ( @@ -796,7 +800,7 @@ func (m *GroupManager) MergeKeyspaceGroups(mergeTargetID uint32, mergeList []uin groups[kgID] = kg } mergeTargetKg = groups[mergeTargetID] - keyspaces := make(map[uint32]struct{}, len(mergeTargetKg.Keyspaces)) + keyspaces := make(map[uint32]struct{}) for _, keyspace := range mergeTargetKg.Keyspaces { keyspaces[keyspace] = struct{}{} } diff --git a/pkg/keyspace/tso_keyspace_group_test.go b/pkg/keyspace/tso_keyspace_group_test.go index 6d088770fb5..f4f8bcd69ff 100644 --- a/pkg/keyspace/tso_keyspace_group_test.go +++ b/pkg/keyspace/tso_keyspace_group_test.go @@ -387,4 +387,11 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupMerge() { re.Equal([]uint32{111, 222, 333, 444, 555}, kg1.Keyspaces) re.False(kg1.IsSplitting()) re.False(kg1.IsMerging()) + + // merge a non-existing keyspace group + err = suite.kgm.MergeKeyspaceGroups(4, []uint32{5}) + re.ErrorIs(err, ErrKeyspaceGroupNotExists) + // merge with the number of keyspace groups exceeds the limit + err = suite.kgm.MergeKeyspaceGroups(1, make([]uint32, maxEtcdTxnOps/2)) + re.ErrorIs(err, ErrExceedMaxEtcdTxnOps) } diff --git a/server/apiv2/handlers/tso_keyspace_group.go b/server/apiv2/handlers/tso_keyspace_group.go index 0dca56709b5..38bb9b86e74 100644 --- a/server/apiv2/handlers/tso_keyspace_group.go +++ b/server/apiv2/handlers/tso_keyspace_group.go @@ -207,12 +207,12 @@ func SplitKeyspaceGroupByID(c *gin.Context) { patrolKeyspaceAssignmentState.patrolled = true } patrolKeyspaceAssignmentState.Unlock() - // Split keyspace group. groupManager := svr.GetKeyspaceGroupManager() if groupManager == nil { c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) return } + // Split keyspace group. err = groupManager.SplitKeyspaceGroupByID(id, splitParams.NewID, splitParams.Keyspaces) if err != nil { c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) @@ -269,12 +269,12 @@ func MergeKeyspaceGroups(c *gin.Context) { } svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) - // Split keyspace group. groupManager := svr.GetKeyspaceGroupManager() if groupManager == nil { c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) return } + // Merge keyspace group. err = groupManager.MergeKeyspaceGroups(id, mergeParams.MergeList) if err != nil { c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())