Skip to content

Commit

Permalink
tso, member: support TSO split based on keyspace group split (#6313)
Browse files Browse the repository at this point in the history
ref #6232

Support TSO split based on keyspace group split.

Signed-off-by: JmPotato <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
JmPotato and ti-chi-bot committed Apr 18, 2023
1 parent b9a03d2 commit e15b211
Show file tree
Hide file tree
Showing 21 changed files with 702 additions and 358 deletions.
40 changes: 25 additions & 15 deletions errors.toml
Original file line number Diff line number Diff line change
@@ -1,21 +1,6 @@
# AUTOGENERATED BY github.com/pingcap/errors/errdoc-gen
# YOU CAN CHANGE THE 'description'/'workaround' FIELDS IF THEM ARE IMPROPER.

["ErrLoadKeyspaceGroupsRetryExhausted"]
error = '''
load keyspace groups retry exhausted, %s
'''

["ErrLoadKeyspaceGroupsTerminated"]
error = '''
load keyspace groups terminated
'''

["ErrLoadKeyspaceGroupsTimeout"]
error = '''
load keyspace groups timeout
'''

["PD:ErrEncryptionKMS"]
error = '''
KMS error
Expand Down Expand Up @@ -506,6 +491,11 @@ error = '''
marshal leader failed
'''

["PD:member:ErrPreCheckCampaign"]
error = '''
pre-check campaign failed
'''

["PD:netstat:ErrNetstatTCPSocks"]
error = '''
TCP socks error
Expand Down Expand Up @@ -761,11 +751,31 @@ error = '''
the keyspace group id is invalid, %s
'''

["PD:tso:ErrKeyspaceGroupNotInitialized"]
error = '''
the keyspace group %d isn't initialized
'''

["PD:tso:ErrKeyspaceNotAssigned"]
error = '''
the keyspace %d isn't assigned to any keyspace group
'''

["PD:tso:ErrLoadKeyspaceGroupsRetryExhausted"]
error = '''
load keyspace groups retry exhausted, %s
'''

["PD:tso:ErrLoadKeyspaceGroupsTerminated"]
error = '''
load keyspace groups terminated
'''

["PD:tso:ErrLoadKeyspaceGroupsTimeout"]
error = '''
load keyspace groups timeout
'''

["PD:tso:ErrLogicOverflow"]
error = '''
logic part overflow
Expand Down
8 changes: 5 additions & 3 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,18 @@ var (
ErrProxyTSOTimeout = errors.Normalize("proxy tso timeout", errors.RFCCodeText("PD:tso:ErrProxyTSOTimeout"))
ErrKeyspaceGroupIDInvalid = errors.Normalize("the keyspace group id is invalid, %s", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupIDInvalid"))
ErrGetAllocatorManager = errors.Normalize("get allocator manager failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocatorManager"))
ErrLoadKeyspaceGroupsTimeout = errors.Normalize("load keyspace groups timeout", errors.RFCCodeText("ErrLoadKeyspaceGroupsTimeout"))
ErrLoadKeyspaceGroupsTerminated = errors.Normalize("load keyspace groups terminated", errors.RFCCodeText("ErrLoadKeyspaceGroupsTerminated"))
ErrLoadKeyspaceGroupsRetryExhausted = errors.Normalize("load keyspace groups retry exhausted, %s", errors.RFCCodeText("ErrLoadKeyspaceGroupsRetryExhausted"))
ErrLoadKeyspaceGroupsTimeout = errors.Normalize("load keyspace groups timeout", errors.RFCCodeText("PD:tso:ErrLoadKeyspaceGroupsTimeout"))
ErrLoadKeyspaceGroupsTerminated = errors.Normalize("load keyspace groups terminated", errors.RFCCodeText("PD:tso:ErrLoadKeyspaceGroupsTerminated"))
ErrLoadKeyspaceGroupsRetryExhausted = errors.Normalize("load keyspace groups retry exhausted, %s", errors.RFCCodeText("PD:tso:ErrLoadKeyspaceGroupsRetryExhausted"))
ErrKeyspaceGroupNotInitialized = errors.Normalize("the keyspace group %d isn't initialized", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupNotInitialized"))
ErrKeyspaceNotAssigned = errors.Normalize("the keyspace %d isn't assigned to any keyspace group", errors.RFCCodeText("PD:tso:ErrKeyspaceNotAssigned"))
)

// member errors
var (
ErrEtcdLeaderNotFound = errors.Normalize("etcd leader not found", errors.RFCCodeText("PD:member:ErrEtcdLeaderNotFound"))
ErrMarshalLeader = errors.Normalize("marshal leader failed", errors.RFCCodeText("PD:member:ErrMarshalLeader"))
ErrPreCheckCampaign = errors.Normalize("pre-check campaign failed", errors.RFCCodeText("PD:member:ErrPreCheckCampaign"))
)

// core errors
Expand Down
110 changes: 60 additions & 50 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (m *GroupManager) DeleteKeyspaceGroupByID(id uint32) (*endpoint.KeyspaceGro
if kg == nil {
return nil
}
if kg.InSplit {
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
return m.store.DeleteKeyspaceGroup(txn, id)
Expand Down Expand Up @@ -176,17 +176,24 @@ func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGro
if oldKG != nil && !overwrite {
return ErrKeyspaceGroupExists
}
if oldKG != nil && oldKG.InSplit && overwrite {
if oldKG.IsSplitting() && overwrite {
return ErrKeyspaceGroupInSplit
}
m.store.SaveKeyspaceGroup(txn, &endpoint.KeyspaceGroup{
newKG := &endpoint.KeyspaceGroup{
ID: keyspaceGroup.ID,
UserKind: keyspaceGroup.UserKind,
Members: keyspaceGroup.Members,
Keyspaces: keyspaceGroup.Keyspaces,
InSplit: keyspaceGroup.InSplit,
SplitFrom: keyspaceGroup.SplitFrom,
})
}
if oldKG.IsSplitting() {
newKG.SplitState = &endpoint.SplitState{
SplitSource: oldKG.SplitState.SplitSource,
}
}
err = m.store.SaveKeyspaceGroup(txn, newKG)
if err != nil {
return err
}
}
return nil
})
Expand Down Expand Up @@ -230,7 +237,7 @@ func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupI
if kg == nil {
return errors.Errorf("keyspace group %d not found", id)
}
if kg.InSplit {
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
switch mutation {
Expand Down Expand Up @@ -276,7 +283,7 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse
if newKG == nil {
return errors.Errorf("keyspace group %s not found in %s group", newGroupID, newUserKind)
}
if oldKG.InSplit || newKG.InSplit {
if oldKG.IsSplitting() || newKG.IsSplitting() {
return ErrKeyspaceGroupInSplit
}

Expand Down Expand Up @@ -308,40 +315,41 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse

// SplitKeyspaceGroupByID splits the keyspace group by ID into a new keyspace group with the given new ID.
// And the keyspaces in the old keyspace group will be moved to the new keyspace group.
func (m *GroupManager) SplitKeyspaceGroupByID(splitFromID, splitToID uint32, keyspaces []uint32) error {
var splitFromKg, splitToKg *endpoint.KeyspaceGroup
func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint32, keyspaces []uint32) error {
var splitSourceKg, splitTargetKg *endpoint.KeyspaceGroup
m.Lock()
defer m.Unlock()
// TODO: avoid to split when the keyspaces is empty.
if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
// Load the old keyspace group first.
splitFromKg, err = m.store.LoadKeyspaceGroup(txn, splitFromID)
splitSourceKg, err = m.store.LoadKeyspaceGroup(txn, splitSourceID)
if err != nil {
return err
}
if splitFromKg == nil {
if splitSourceKg == nil {
return ErrKeyspaceGroupNotFound
}
if splitFromKg.InSplit {
// A keyspace group can not take part in multiple split processes.
if splitSourceKg.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
// Check if the new keyspace group already exists.
splitToKg, err = m.store.LoadKeyspaceGroup(txn, splitToID)
splitTargetKg, err = m.store.LoadKeyspaceGroup(txn, splitTargetID)
if err != nil {
return err
}
if splitToKg != nil {
if splitTargetKg != nil {
return ErrKeyspaceGroupExists
}
// Check if the keyspaces are all in the old keyspace group.
if len(keyspaces) > len(splitFromKg.Keyspaces) {
if len(keyspaces) > len(splitSourceKg.Keyspaces) {
return ErrKeyspaceNotInKeyspaceGroup
}
var (
oldKeyspaceMap = make(map[uint32]struct{}, len(splitFromKg.Keyspaces))
oldKeyspaceMap = make(map[uint32]struct{}, len(splitSourceKg.Keyspaces))
newKeyspaceMap = make(map[uint32]struct{}, len(keyspaces))
)
for _, keyspace := range splitFromKg.Keyspaces {
for _, keyspace := range splitSourceKg.Keyspaces {
oldKeyspaceMap[keyspace] = struct{}{}
}
for _, keyspace := range keyspaces {
Expand All @@ -351,75 +359,77 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitFromID, splitToID uint32, key
newKeyspaceMap[keyspace] = struct{}{}
}
// Get the split keyspace group for the old keyspace group.
splitKeyspaces := make([]uint32, 0, len(splitFromKg.Keyspaces)-len(keyspaces))
for _, keyspace := range splitFromKg.Keyspaces {
splitKeyspaces := make([]uint32, 0, len(splitSourceKg.Keyspaces)-len(keyspaces))
for _, keyspace := range splitSourceKg.Keyspaces {
if _, ok := newKeyspaceMap[keyspace]; !ok {
splitKeyspaces = append(splitKeyspaces, keyspace)
}
}
// Update the old keyspace group.
splitFromKg.Keyspaces = splitKeyspaces
splitFromKg.InSplit = true
if err = m.store.SaveKeyspaceGroup(txn, splitFromKg); err != nil {
splitSourceKg.Keyspaces = splitKeyspaces
splitSourceKg.SplitState = &endpoint.SplitState{
SplitSource: splitSourceKg.ID,
}
if err = m.store.SaveKeyspaceGroup(txn, splitSourceKg); err != nil {
return err
}
splitToKg = &endpoint.KeyspaceGroup{
ID: splitToID,
splitTargetKg = &endpoint.KeyspaceGroup{
ID: splitTargetID,
// Keep the same user kind and members as the old keyspace group.
UserKind: splitFromKg.UserKind,
Members: splitFromKg.Members,
UserKind: splitSourceKg.UserKind,
Members: splitSourceKg.Members,
Keyspaces: keyspaces,
// Only set the new keyspace group in split state.
InSplit: true,
SplitFrom: splitFromKg.ID,
SplitState: &endpoint.SplitState{
SplitSource: splitSourceKg.ID,
},
}
// Create the new split keyspace group.
return m.store.SaveKeyspaceGroup(txn, splitToKg)
return m.store.SaveKeyspaceGroup(txn, splitTargetKg)
}); err != nil {
return err
}
// Update the keyspace group cache.
m.groups[endpoint.StringUserKind(splitFromKg.UserKind)].Put(splitFromKg)
m.groups[endpoint.StringUserKind(splitToKg.UserKind)].Put(splitToKg)
m.groups[endpoint.StringUserKind(splitSourceKg.UserKind)].Put(splitSourceKg)
m.groups[endpoint.StringUserKind(splitTargetKg.UserKind)].Put(splitTargetKg)
return nil
}

// FinishSplitKeyspaceByID finishes the split keyspace group by the split-to ID.
func (m *GroupManager) FinishSplitKeyspaceByID(splitToID uint32) error {
var splitToKg, splitFromKg *endpoint.KeyspaceGroup
// FinishSplitKeyspaceByID finishes the split keyspace group by the split target ID.
func (m *GroupManager) FinishSplitKeyspaceByID(splitTargetID uint32) error {
var splitTargetKg, splitSourceKg *endpoint.KeyspaceGroup
m.Lock()
defer m.Unlock()
if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
// Load the split-to keyspace group first.
splitToKg, err = m.store.LoadKeyspaceGroup(txn, splitToID)
// Load the split target keyspace group first.
splitTargetKg, err = m.store.LoadKeyspaceGroup(txn, splitTargetID)
if err != nil {
return err
}
if splitToKg == nil {
if splitTargetKg == nil {
return ErrKeyspaceGroupNotFound
}
// Check if it's in the split state.
if !splitToKg.InSplit {
if !splitTargetKg.IsSplitTarget() {
return ErrKeyspaceGroupNotInSplit
}
// Load the split-from keyspace group then.
splitFromKg, err = m.store.LoadKeyspaceGroup(txn, splitToKg.SplitFrom)
// Load the split source keyspace group then.
splitSourceKg, err = m.store.LoadKeyspaceGroup(txn, splitTargetKg.SplitSource())
if err != nil {
return err
}
if splitFromKg == nil {
if splitSourceKg == nil {
return ErrKeyspaceGroupNotFound
}
if !splitFromKg.InSplit {
if !splitSourceKg.IsSplitSource() {
return ErrKeyspaceGroupNotInSplit
}
splitToKg.InSplit = false
splitFromKg.InSplit = false
err = m.store.SaveKeyspaceGroup(txn, splitToKg)
splitTargetKg.SplitState = nil
splitSourceKg.SplitState = nil
err = m.store.SaveKeyspaceGroup(txn, splitTargetKg)
if err != nil {
return err
}
err = m.store.SaveKeyspaceGroup(txn, splitFromKg)
err = m.store.SaveKeyspaceGroup(txn, splitSourceKg)
if err != nil {
return err
}
Expand All @@ -428,7 +438,7 @@ func (m *GroupManager) FinishSplitKeyspaceByID(splitToID uint32) error {
return err
}
// Update the keyspace group cache.
m.groups[endpoint.StringUserKind(splitToKg.UserKind)].Put(splitToKg)
m.groups[endpoint.StringUserKind(splitFromKg.UserKind)].Put(splitFromKg)
m.groups[endpoint.StringUserKind(splitTargetKg.UserKind)].Put(splitTargetKg)
m.groups[endpoint.StringUserKind(splitSourceKg.UserKind)].Put(splitSourceKg)
return nil
}
18 changes: 8 additions & 10 deletions pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupOperations() {
re.NoError(err)
re.Equal(uint32(0), kg.ID)
re.Equal(endpoint.Basic.String(), kg.UserKind)
re.False(kg.InSplit)
re.False(kg.IsSplitting())
kg, err = suite.kgm.GetKeyspaceGroupByID(3)
re.NoError(err)
re.Equal(uint32(3), kg.ID)
re.Equal(endpoint.Standard.String(), kg.UserKind)
re.False(kg.InSplit)
re.False(kg.IsSplitting())
// remove the keyspace group 3
kg, err = suite.kgm.DeleteKeyspaceGroupByID(3)
re.NoError(err)
Expand Down Expand Up @@ -253,14 +253,14 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() {
re.NoError(err)
re.Equal(uint32(2), kg2.ID)
re.Equal([]uint32{111, 222}, kg2.Keyspaces)
re.True(kg2.InSplit)
re.Empty(kg2.SplitFrom)
re.True(kg2.IsSplitSource())
re.Equal(kg2.ID, kg2.SplitSource())
kg4, err := suite.kgm.GetKeyspaceGroupByID(4)
re.NoError(err)
re.Equal(uint32(4), kg4.ID)
re.Equal([]uint32{333}, kg4.Keyspaces)
re.True(kg4.InSplit)
re.Equal(kg2.ID, kg4.SplitFrom)
re.True(kg4.IsSplitTarget())
re.Equal(kg2.ID, kg4.SplitSource())
re.Equal(kg2.UserKind, kg4.UserKind)
re.Equal(kg2.Members, kg4.Members)

Expand Down Expand Up @@ -293,14 +293,12 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() {
re.NoError(err)
re.Equal(uint32(2), kg2.ID)
re.Equal([]uint32{111, 222}, kg2.Keyspaces)
re.False(kg2.InSplit)
re.Empty(kg2.SplitFrom)
re.False(kg2.IsSplitting())
kg4, err = suite.kgm.GetKeyspaceGroupByID(4)
re.NoError(err)
re.Equal(uint32(4), kg4.ID)
re.Equal([]uint32{333}, kg4.Keyspaces)
re.False(kg4.InSplit)
re.Equal(kg2.ID, kg4.SplitFrom)
re.False(kg4.IsSplitting())
re.Equal(kg2.UserKind, kg4.UserKind)
re.Equal(kg2.Members, kg4.Members)

Expand Down
13 changes: 7 additions & 6 deletions pkg/mcs/tso/server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package server
import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/tso"
"go.uber.org/zap"
)
Expand All @@ -31,14 +30,16 @@ func newHandler(s *Server) *Handler {
return &Handler{s: s}
}

// ResetTS resets the ts with specified tso.
// TODO: Support multiple keyspace groups.
func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool) error {
// ResetTS resets the TSO with the specified one.
func (h *Handler) ResetTS(
ts uint64, ignoreSmaller, skipUpperBoundCheck bool, keyspaceGroupID uint32,
) error {
log.Info("reset-ts",
zap.Uint64("new-ts", ts),
zap.Bool("ignore-smaller", ignoreSmaller),
zap.Bool("skip-upper-bound-check", skipUpperBoundCheck))
tsoAllocatorManager, err := h.s.GetTSOAllocatorManager(mcsutils.DefaultKeyspaceGroupID)
zap.Bool("skip-upper-bound-check", skipUpperBoundCheck),
zap.Uint32("keyspace-group-id", keyspaceGroupID))
tsoAllocatorManager, err := h.s.GetTSOAllocatorManager(keyspaceGroupID)
if err != nil {
log.Error("failed to get allocator manager", errs.ZapError(err))
return err
Expand Down
Loading

0 comments on commit e15b211

Please sign in to comment.