Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keyspace, api: support the keyspace group split #6293

Merged
merged 10 commits into from
Apr 11, 2023
Merged
105 changes: 95 additions & 10 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (m *GroupManager) GetKeyspaceGroups(startID uint32, limit int) ([]*endpoint
return m.store.LoadKeyspaceGroups(startID, limit)
}

// GetKeyspaceGroupByID returns the keyspace group by id.
// GetKeyspaceGroupByID returns the keyspace group by ID.
func (m *GroupManager) GetKeyspaceGroupByID(id uint32) (*endpoint.KeyspaceGroup, error) {
var (
kg *endpoint.KeyspaceGroup
Expand All @@ -135,7 +135,7 @@ func (m *GroupManager) GetKeyspaceGroupByID(id uint32) (*endpoint.KeyspaceGroup,
return kg, nil
}

// DeleteKeyspaceGroupByID deletes the keyspace group by id.
// DeleteKeyspaceGroupByID deletes the keyspace group by ID.
func (m *GroupManager) DeleteKeyspaceGroupByID(id uint32) (*endpoint.KeyspaceGroup, error) {
var (
kg *endpoint.KeyspaceGroup
Expand Down Expand Up @@ -165,15 +165,11 @@ func (m *GroupManager) DeleteKeyspaceGroupByID(id uint32) (*endpoint.KeyspaceGro
return kg, nil
}

// saveKeyspaceGroups will try to save the given keyspace groups into the storage.
// If any keyspace group already exists and `overwrite` is false, it will return ErrKeyspaceGroupExists.
func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGroup, overwrite bool) error {
return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
for _, keyspaceGroup := range keyspaceGroups {
// TODO: add replica count
newKG := &endpoint.KeyspaceGroup{
ID: keyspaceGroup.ID,
UserKind: keyspaceGroup.UserKind,
Keyspaces: keyspaceGroup.Keyspaces,
}
// Check if keyspace group has already existed.
oldKG, err := m.store.LoadKeyspaceGroup(txn, keyspaceGroup.ID)
if err != nil {
Expand All @@ -182,13 +178,19 @@ func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGro
if oldKG != nil && !overwrite {
return ErrKeyspaceGroupExists
}
m.store.SaveKeyspaceGroup(txn, newKG)
m.store.SaveKeyspaceGroup(txn, &endpoint.KeyspaceGroup{
ID: keyspaceGroup.ID,
UserKind: keyspaceGroup.UserKind,
Members: keyspaceGroup.Members,
Keyspaces: keyspaceGroup.Keyspaces,
InSplit: keyspaceGroup.InSplit,
})
}
return nil
})
}

// GetAvailableKeyspaceGroupIDByKind returns the available keyspace group id by user kind.
// GetAvailableKeyspaceGroupIDByKind returns the available keyspace group ID by user kind.
func (m *GroupManager) GetAvailableKeyspaceGroupIDByKind(userKind endpoint.UserKind) (string, error) {
m.RLock()
defer m.RUnlock()
Expand Down Expand Up @@ -278,3 +280,86 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse

return nil
}

// SplitKeyspaceGroupByID splits the keyspace group by ID into a new keyspace group with the given new ID.
// And the keyspaces in the old keyspace group will be moved to the new keyspace group.
func (m *GroupManager) SplitKeyspaceGroupByID(id, newID uint32, keyspaces []uint32) error {
// TODO: avoid to split when the keyspaces is empty.
return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
// Load the old keyspace group first.
oldKg, err := m.store.LoadKeyspaceGroup(txn, id)
if err != nil {
return err
}
if oldKg == nil {
return ErrKeyspaceGroupNotFound
}
// Check if the new keyspace group already exists.
newKg, err := m.store.LoadKeyspaceGroup(txn, newID)
if err != nil {
return err
}
if newKg != nil {
return ErrKeyspaceGroupExists
}
// Check if the keyspaces are all in the old keyspace group.
if len(keyspaces) > len(oldKg.Keyspaces) {
return ErrKeyspaceNotInKeyspaceGroup
}
var (
oldKeyspaceMap = make(map[uint32]struct{}, len(oldKg.Keyspaces))
newKeyspaceMap = make(map[uint32]struct{}, len(keyspaces))
)
for _, keyspace := range oldKg.Keyspaces {
oldKeyspaceMap[keyspace] = struct{}{}
}
for _, keyspace := range keyspaces {
if _, ok := oldKeyspaceMap[keyspace]; !ok {
return ErrKeyspaceNotInKeyspaceGroup
}
newKeyspaceMap[keyspace] = struct{}{}
}
// Get the split keyspace group for the old keyspace group.
splitKeyspaces := make([]uint32, 0, len(oldKg.Keyspaces)-len(keyspaces))
for _, keyspace := range oldKg.Keyspaces {
if _, ok := newKeyspaceMap[keyspace]; !ok {
splitKeyspaces = append(splitKeyspaces, keyspace)
}
}
// Update the old keyspace group.
oldKg.Keyspaces = splitKeyspaces
if err = m.store.SaveKeyspaceGroup(txn, oldKg); err != nil {
return err
}
// Create the new split keyspace group.
return m.store.SaveKeyspaceGroup(txn, &endpoint.KeyspaceGroup{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though, the put events to the old and new groups will be propagated asynchronously, we should still update/create both keyspace groups in a transaction to provide atomic operation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole SplitKeyspaceGroupByID is in a txn.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

ID: newID,
// Keep the same user kind and members as the old keyspace group.
UserKind: oldKg.UserKind,
Members: oldKg.Members,
Keyspaces: keyspaces,
// Only set the new keyspace group in split state.
InSplit: true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need one more field to denote this keyspace group is split from which keyspace group?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At present, I haven't thought of any scenarios where this field would be useful. Perhaps we can add it when we actually need it.

})
})
}

// FinishSplitKeyspaceByID finishes the split keyspace group by ID.
func (m *GroupManager) FinishSplitKeyspaceByID(id uint32) error {
return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
// Load the keyspace group first.
kg, err := m.store.LoadKeyspaceGroup(txn, id)
if err != nil {
return err
}
if kg == nil {
return ErrKeyspaceGroupNotFound
}
// Check if it's in the split state.
if !kg.InSplit {
return ErrKeyspaceGroupNotInSplit
}
kg.InSplit = false
return m.store.SaveKeyspaceGroup(txn, kg)
})
}
60 changes: 57 additions & 3 deletions pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupOperations() {
UserKind: endpoint.Standard.String(),
},
{
ID: uint32(2),
UserKind: endpoint.Standard.String(),
ID: uint32(2),
UserKind: endpoint.Standard.String(),
Keyspaces: []uint32{111, 222, 333},
},
{
ID: uint32(3),
Expand All @@ -86,10 +87,12 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupOperations() {
re.NoError(err)
re.Equal(uint32(0), kg.ID)
re.Equal(endpoint.Basic.String(), kg.UserKind)
re.False(kg.InSplit)
kg, err = suite.kgm.GetKeyspaceGroupByID(3)
re.NoError(err)
re.Equal(uint32(3), kg.ID)
re.Equal(endpoint.Standard.String(), kg.UserKind)
re.False(kg.InSplit)
// remove the keyspace group 3
kg, err = suite.kgm.DeleteKeyspaceGroupByID(3)
re.NoError(err)
Expand All @@ -98,7 +101,6 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupOperations() {
kg, err = suite.kgm.GetKeyspaceGroupByID(3)
re.NoError(err)
re.Empty(kg)

// create an existing keyspace group
keyspaceGroups = []*endpoint.KeyspaceGroup{{ID: uint32(1), UserKind: endpoint.Standard.String()}}
err = suite.kgm.CreateKeyspaceGroups(keyspaceGroups)
Expand Down Expand Up @@ -227,3 +229,55 @@ func (suite *keyspaceGroupTestSuite) TestUpdateKeyspace() {
re.NoError(err)
re.Len(kg3.Keyspaces, 1)
}

func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() {
re := suite.Require()

keyspaceGroups := []*endpoint.KeyspaceGroup{
{
ID: uint32(1),
UserKind: endpoint.Basic.String(),
},
{
ID: uint32(2),
UserKind: endpoint.Standard.String(),
Keyspaces: []uint32{111, 222, 333},
},
}
err := suite.kgm.CreateKeyspaceGroups(keyspaceGroups)
re.NoError(err)
// split the keyspace group 2 to 4
err = suite.kgm.SplitKeyspaceGroupByID(2, 4, []uint32{333})
re.NoError(err)
kg2, err := suite.kgm.GetKeyspaceGroupByID(2)
re.NoError(err)
re.Equal(uint32(2), kg2.ID)
re.Equal([]uint32{111, 222}, kg2.Keyspaces)
re.False(kg2.InSplit)
kg4, err := suite.kgm.GetKeyspaceGroupByID(4)
re.NoError(err)
re.Equal(uint32(4), kg4.ID)
re.Equal([]uint32{333}, kg4.Keyspaces)
re.True(kg4.InSplit)
re.Equal(kg2.UserKind, kg4.UserKind)
re.Equal(kg2.Members, kg4.Members)
// finish the split of keyspace group 4
err = suite.kgm.FinishSplitKeyspaceByID(4)
re.NoError(err)
kg4, err = suite.kgm.GetKeyspaceGroupByID(4)
re.NoError(err)
re.Equal(uint32(4), kg4.ID)
re.False(kg4.InSplit)
// split a non-existing keyspace group
err = suite.kgm.SplitKeyspaceGroupByID(3, 5, nil)
re.ErrorIs(err, ErrKeyspaceGroupNotFound)
// finish the split of a non-existing keyspace group
err = suite.kgm.FinishSplitKeyspaceByID(5)
re.ErrorIs(err, ErrKeyspaceGroupNotFound)
// split into an existing keyspace group
err = suite.kgm.SplitKeyspaceGroupByID(2, 4, nil)
re.ErrorIs(err, ErrKeyspaceGroupExists)
// split with the wrong keyspaces.
err = suite.kgm.SplitKeyspaceGroupByID(2, 5, []uint32{111, 222, 444})
re.ErrorIs(err, ErrKeyspaceNotInKeyspaceGroup)
}
10 changes: 8 additions & 2 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,14 @@ var (
ErrKeyspaceExists = errors.New("keyspace already exists")
// ErrKeyspaceGroupExists indicates target keyspace group already exists.
ErrKeyspaceGroupExists = errors.New("keyspace group already exists")
errModifyDefault = errors.New("cannot modify default keyspace's state")
errIllegalOperation = errors.New("unknown operation")
// ErrKeyspaceGroupNotFound is used to indicate target keyspace group does not exist.
ErrKeyspaceGroupNotFound = errors.New("keyspace group does not exist")
// ErrKeyspaceGroupNotInSplit is used to indicate target keyspace group is not in split state.
ErrKeyspaceGroupNotInSplit = errors.New("keyspace group is not in split state")
// ErrKeyspaceNotInKeyspaceGroup is used to indicate target keyspace is not in this keyspace group.
ErrKeyspaceNotInKeyspaceGroup = errors.New("keyspace is not in this keyspace group")
errModifyDefault = errors.New("cannot modify default keyspace's state")
errIllegalOperation = errors.New("unknown operation")

// stateTransitionTable lists all allowed next state for the given current state.
// Note that transit from any state to itself is allowed for idempotence.
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/endpoint/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type KeyspaceGroupMember struct {
type KeyspaceGroup struct {
ID uint32 `json:"id"`
UserKind string `json:"user-kind"`
// InSplit indicates whether the keyspace group is in split.
InSplit bool `json:"in-split"`
// Members are the election members which campaign for the primary of the keyspace group.
Members []KeyspaceGroupMember `json:"members"`
// Keyspaces are the keyspace IDs which belong to the keyspace group.
Expand All @@ -90,7 +92,7 @@ type KeyspaceGroupStorage interface {

var _ KeyspaceGroupStorage = (*StorageEndpoint)(nil)

// LoadKeyspaceGroup loads the keyspace group by id.
// LoadKeyspaceGroup loads the keyspace group by ID.
func (se *StorageEndpoint) LoadKeyspaceGroup(txn kv.Txn, id uint32) (*KeyspaceGroup, error) {
value, err := txn.Load(KeyspaceGroupIDPath(id))
if err != nil || value == "" {
Expand Down
4 changes: 2 additions & 2 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,8 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) (
log.Warn("failed to unmarshal keyspace group",
zap.Uint32("keysapce-group-id", id),
zap.Error(errs.ErrJSONUnmarshal.Wrap(err).FastGenWithCause()))
} else {
kgm.updateKeyspaceGroup(group)
}
kgm.updateKeyspaceGroup(group)
case clientv3.EventTypeDelete:
kgm.deleteKeyspaceGroup(id)
}
Expand Down Expand Up @@ -466,6 +465,7 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
zap.String("participant-name", uniqueName),
zap.Uint64("participant-id", uniqueID))

// TODO: handle the keyspace group & TSO split logic.
participant := member.NewParticipant(kgm.etcdClient)
participant.InitInfo(
uniqueName, uniqueID, path.Join(kgm.tsoSvcRootPath, fmt.Sprintf("%05d", group.ID)),
Expand Down
Loading