Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keyspace, apiv2: implement the keyspace group merging API #6594

Merged
merged 4 commits into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,9 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
if defaultKeyspaceGroup.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
if defaultKeyspaceGroup.IsMerging() {
return ErrKeyspaceGroupInMerging
}
keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, keyspacePatrolBatchSize)
if err != nil {
return err
Expand Down
125 changes: 120 additions & 5 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,9 @@ func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGro
if oldKG.IsSplitting() && overwrite {
return ErrKeyspaceGroupInSplit
}
if oldKG.IsMerging() && overwrite {
return ErrKeyspaceGroupInMerging
}
newKG := &endpoint.KeyspaceGroup{
ID: keyspaceGroup.ID,
UserKind: keyspaceGroup.UserKind,
Expand Down Expand Up @@ -415,6 +418,9 @@ func (m *GroupManager) updateKeyspaceForGroupLocked(userKind endpoint.UserKind,
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
if kg.IsMerging() {
return ErrKeyspaceGroupInMerging
}

changed := false

Expand Down Expand Up @@ -469,6 +475,9 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse
if oldKG.IsSplitting() || newKG.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
if oldKG.IsMerging() || newKG.IsMerging() {
return ErrKeyspaceGroupInMerging
}

var updateOld, updateNew bool
if !slice.Contains(newKG.Keyspaces, keyspaceID) {
Expand Down Expand Up @@ -516,6 +525,10 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint3
if splitSourceKg.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
// A keyspace group can not be split when it is in merging.
if splitSourceKg.IsMerging() {
return ErrKeyspaceGroupInMerging
}
// Check if the source keyspace group has enough replicas.
if len(splitSourceKg.Members) < utils.KeyspaceGroupDefaultReplicaCount {
return ErrKeyspaceGroupNotEnoughReplicas
Expand Down Expand Up @@ -618,11 +631,7 @@ func (m *GroupManager) FinishSplitKeyspaceByID(splitTargetID uint32) error {
if err != nil {
return err
}
err = m.store.SaveKeyspaceGroup(txn, splitSourceKg)
if err != nil {
return err
}
return nil
return m.store.SaveKeyspaceGroup(txn, splitSourceKg)
}); err != nil {
return err
}
Expand Down Expand Up @@ -663,6 +672,9 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
if kg.IsMerging() {
return ErrKeyspaceGroupInMerging
}
exists := make(map[string]struct{})
for _, member := range kg.Members {
exists[member.Address] = struct{}{}
Expand Down Expand Up @@ -719,6 +731,9 @@ func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
if kg.IsMerging() {
return ErrKeyspaceGroupInMerging
}
members := make([]endpoint.KeyspaceGroupMember, 0, len(nodes))
for _, node := range nodes {
members = append(members, endpoint.KeyspaceGroupMember{Address: node})
Expand All @@ -743,3 +758,103 @@ func (m *GroupManager) IsExistNode(addr string) bool {
}
return false
}

// MergeKeyspaceGroups merges the keyspace group in the list into the target keyspace group.
func (m *GroupManager) MergeKeyspaceGroups(mergeTargetID uint32, mergeList []uint32) error {
mergeListNum := len(mergeList)
if mergeListNum == 0 {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
var (
groups = make(map[uint32]*endpoint.KeyspaceGroup, mergeListNum+1)
mergeTargetKg *endpoint.KeyspaceGroup
)
m.Lock()
defer m.Unlock()
if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
// Load and check all keyspace groups first.
for _, kgID := range append(mergeList, mergeTargetID) {
kg, err := m.store.LoadKeyspaceGroup(txn, kgID)
if err != nil {
return err
}
if kg == nil {
return ErrKeyspaceGroupNotExists
}
// A keyspace group can not be merged if it's in splitting.
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
// A keyspace group can not be split when it is in merging.
if kg.IsMerging() {
return ErrKeyspaceGroupInMerging
}
groups[kgID] = kg
}
mergeTargetKg = groups[mergeTargetID]
// Delete the keyspace groups in merge list and move the keyspaces in it to the target keyspace group.
for _, kgID := range mergeList {
kg := groups[kgID]
for _, keyspace := range kg.Keyspaces {
if !slice.Contains(mergeTargetKg.Keyspaces, keyspace) {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
mergeTargetKg.Keyspaces = append(mergeTargetKg.Keyspaces, keyspace)
}
}
if err := m.store.DeleteKeyspaceGroup(txn, kg.ID); err != nil {
return err
}
}
// Update the merge state of the target keyspace group.
mergeTargetKg.MergeState = &endpoint.MergeState{
MergeList: mergeList,
}
return m.store.SaveKeyspaceGroup(txn, mergeTargetKg)
}); err != nil {
return err
}
// Update the keyspace group cache.
m.groups[endpoint.StringUserKind(mergeTargetKg.UserKind)].Put(mergeTargetKg)
for _, kgID := range mergeList {
kg := groups[kgID]
m.groups[endpoint.StringUserKind(kg.UserKind)].Remove(kgID)
}
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

// FinishMergeKeyspaceByID finishes the merging keyspace group by the merge target ID.
func (m *GroupManager) FinishMergeKeyspaceByID(mergeTargetID uint32) error {
var mergeTargetKg *endpoint.KeyspaceGroup
m.Lock()
defer m.Unlock()
if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
// Load the merge target keyspace group first.
mergeTargetKg, err = m.store.LoadKeyspaceGroup(txn, mergeTargetID)
if err != nil {
return err
}
if mergeTargetKg == nil {
return ErrKeyspaceGroupNotExists
}
// Check if it's in the merging state.
if !mergeTargetKg.IsMergeTarget() {
return ErrKeyspaceGroupNotInMerging
}
// Make sure all merging keyspace groups are deleted.
for _, kgID := range mergeTargetKg.MergeState.MergeList {
kg, err := m.store.LoadKeyspaceGroup(txn, kgID)
if err != nil {
return err
}
if kg != nil {
return ErrKeyspaceGroupNotInMerging
}
}
mergeTargetKg.MergeState = nil
return m.store.SaveKeyspaceGroup(txn, mergeTargetKg)
}); err != nil {
return err
}
// Update the keyspace group cache.
m.groups[endpoint.StringUserKind(mergeTargetKg.UserKind)].Put(mergeTargetKg)
return nil
}
70 changes: 69 additions & 1 deletion pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,12 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupOperations() {
re.NoError(err)
re.Len(kgs, 2)
// get the default keyspace group
kg, err := suite.kgm.GetKeyspaceGroupByID(0)
kg, err := suite.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.Equal(uint32(0), kg.ID)
re.Equal(endpoint.Basic.String(), kg.UserKind)
re.False(kg.IsSplitting())
// get the keyspace group 3
kg, err = suite.kgm.GetKeyspaceGroupByID(3)
re.NoError(err)
re.Equal(uint32(3), kg.ID)
Expand Down Expand Up @@ -320,3 +321,70 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() {
err = suite.kgm.SplitKeyspaceGroupByID(2, 5, []uint32{111, 222, 444})
re.ErrorIs(err, ErrKeyspaceNotInKeyspaceGroup)
}

func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupMerge() {
re := suite.Require()

keyspaceGroups := []*endpoint.KeyspaceGroup{
{
ID: uint32(1),
UserKind: endpoint.Basic.String(),
Keyspaces: []uint32{111, 222, 333},
Members: make([]endpoint.KeyspaceGroupMember, utils.KeyspaceGroupDefaultReplicaCount),
},
{
ID: uint32(3),
UserKind: endpoint.Basic.String(),
Keyspaces: []uint32{444, 555},
},
}
err := suite.kgm.CreateKeyspaceGroups(keyspaceGroups)
re.NoError(err)
// split the keyspace group 1 to 2
err = suite.kgm.SplitKeyspaceGroupByID(1, 2, []uint32{333})
re.NoError(err)
// finish the split of the keyspace group 2
err = suite.kgm.FinishSplitKeyspaceByID(2)
re.NoError(err)
// check the keyspace group 1 and 2
kg1, err := suite.kgm.GetKeyspaceGroupByID(1)
re.NoError(err)
re.Equal(uint32(1), kg1.ID)
re.Equal([]uint32{111, 222}, kg1.Keyspaces)
re.False(kg1.IsSplitting())
re.False(kg1.IsMerging())
kg2, err := suite.kgm.GetKeyspaceGroupByID(2)
re.NoError(err)
re.Equal(uint32(2), kg2.ID)
re.Equal([]uint32{333}, kg2.Keyspaces)
re.False(kg2.IsSplitting())
re.False(kg2.IsMerging())
re.Equal(kg1.UserKind, kg2.UserKind)
re.Equal(kg1.Members, kg2.Members)
// merge the keyspace group 2 and 3 back into 1
err = suite.kgm.MergeKeyspaceGroups(1, []uint32{2, 3})
re.NoError(err)
// check the keyspace group 2 and 3
kg2, err = suite.kgm.GetKeyspaceGroupByID(2)
re.NoError(err)
re.Nil(kg2)
kg3, err := suite.kgm.GetKeyspaceGroupByID(3)
re.NoError(err)
re.Nil(kg3)
// check the keyspace group 1
kg1, err = suite.kgm.GetKeyspaceGroupByID(1)
re.NoError(err)
re.Equal(uint32(1), kg1.ID)
re.Equal([]uint32{111, 222, 333, 444, 555}, kg1.Keyspaces)
re.False(kg1.IsSplitting())
re.True(kg1.IsMerging())
// finish the merging
err = suite.kgm.FinishMergeKeyspaceByID(1)
re.NoError(err)
kg1, err = suite.kgm.GetKeyspaceGroupByID(1)
re.NoError(err)
re.Equal(uint32(1), kg1.ID)
re.Equal([]uint32{111, 222, 333, 444, 555}, kg1.Keyspaces)
re.False(kg1.IsSplitting())
re.False(kg1.IsMerging())
}
4 changes: 4 additions & 0 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ var (
ErrKeyspaceGroupInSplit = errors.New("keyspace group is in split state")
// ErrKeyspaceGroupNotInSplit is used to indicate target keyspace group is not in split state.
ErrKeyspaceGroupNotInSplit = errors.New("keyspace group is not in split state")
// ErrKeyspaceGroupInMerging is used to indicate target keyspace group is in merging state.
ErrKeyspaceGroupInMerging = errors.New("keyspace group is in merging state")
// ErrKeyspaceGroupNotInMerging is used to indicate target keyspace group is not in merging state.
ErrKeyspaceGroupNotInMerging = errors.New("keyspace group is not in merging state")
// ErrKeyspaceNotInKeyspaceGroup is used to indicate target keyspace is not in this keyspace group.
ErrKeyspaceNotInKeyspaceGroup = errors.New("keyspace is not in this keyspace group")
// ErrKeyspaceGroupNotEnoughReplicas is used to indicate not enough replicas in the keyspace group.
Expand Down
24 changes: 24 additions & 0 deletions pkg/storage/endpoint/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"

"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/kv"
"go.etcd.io/etcd/clientv3"
)
Expand Down Expand Up @@ -83,12 +84,20 @@ type SplitState struct {
SplitSource uint32 `json:"split-source"`
}

// MergeState defines the merging state of a keyspace group.
type MergeState struct {
// MergeList is the list of keyspace group IDs which are merging to this target keyspace group.
MergeList []uint32 `json:"merge-list"`
}

// KeyspaceGroup is the keyspace group.
type KeyspaceGroup struct {
ID uint32 `json:"id"`
UserKind string `json:"user-kind"`
// SplitState is the current split state of the keyspace group.
SplitState *SplitState `json:"split-state,omitempty"`
// MergeState is the current merging state of the keyspace group.
MergeState *MergeState `json:"merge-state,omitempty"`
// Members are the election members which campaign for the primary of the keyspace group.
Members []KeyspaceGroupMember `json:"members"`
// Keyspaces are the keyspace IDs which belong to the keyspace group.
Expand Down Expand Up @@ -122,6 +131,21 @@ func (kg *KeyspaceGroup) SplitSource() uint32 {
return 0
}

// IsMerging checks if the keyspace group is in merging state.
func (kg *KeyspaceGroup) IsMerging() bool {
return kg != nil && kg.MergeState != nil
}

// IsMergeTarget checks if the keyspace group is in merging state and is the merge target.
func (kg *KeyspaceGroup) IsMergeTarget() bool {
return kg.IsMerging() && !slice.Contains(kg.MergeState.MergeList, kg.ID)
}

// IsMergeSource checks if the keyspace group is in merging state and is the merge source.
func (kg *KeyspaceGroup) IsMergeSource() bool {
return kg.IsMerging() && slice.Contains(kg.MergeState.MergeList, kg.ID)
}

// KeyspaceGroupStorage is the interface for keyspace group storage.
type KeyspaceGroupStorage interface {
LoadKeyspaceGroups(startID uint32, limit int) ([]*KeyspaceGroup, error)
Expand Down
Loading