Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tso, member: support TSO split based on keyspace group split #6313

Merged
merged 8 commits into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions errors.toml
Original file line number Diff line number Diff line change
@@ -1,21 +1,6 @@
# AUTOGENERATED BY github.com/pingcap/errors/errdoc-gen
# YOU CAN CHANGE THE 'description'/'workaround' FIELDS IF THEM ARE IMPROPER.

["ErrLoadKeyspaceGroupsRetryExhausted"]
error = '''
load keyspace groups retry exhausted, %s
'''

["ErrLoadKeyspaceGroupsTerminated"]
error = '''
load keyspace groups terminated
'''

["ErrLoadKeyspaceGroupsTimeout"]
error = '''
load keyspace groups timeout
'''

["PD:ErrEncryptionKMS"]
error = '''
KMS error
Expand Down Expand Up @@ -506,6 +491,11 @@ error = '''
marshal leader failed
'''

["PD:member:ErrPreCheckCampaign"]
error = '''
pre-check campaign failed
'''

["PD:netstat:ErrNetstatTCPSocks"]
error = '''
TCP socks error
Expand Down Expand Up @@ -761,11 +751,31 @@ error = '''
the keyspace group id is invalid, %s
'''

["PD:tso:ErrKeyspaceGroupNotInitialized"]
error = '''
the keyspace group %d isn't initialized
'''

["PD:tso:ErrKeyspaceNotAssigned"]
error = '''
the keyspace %d isn't assigned to any keyspace group
'''

["PD:tso:ErrLoadKeyspaceGroupsRetryExhausted"]
error = '''
load keyspace groups retry exhausted, %s
'''

["PD:tso:ErrLoadKeyspaceGroupsTerminated"]
error = '''
load keyspace groups terminated
'''

["PD:tso:ErrLoadKeyspaceGroupsTimeout"]
error = '''
load keyspace groups timeout
'''

["PD:tso:ErrLogicOverflow"]
error = '''
logic part overflow
Expand Down
8 changes: 5 additions & 3 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,18 @@ var (
ErrProxyTSOTimeout = errors.Normalize("proxy tso timeout", errors.RFCCodeText("PD:tso:ErrProxyTSOTimeout"))
ErrKeyspaceGroupIDInvalid = errors.Normalize("the keyspace group id is invalid, %s", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupIDInvalid"))
ErrGetAllocatorManager = errors.Normalize("get allocator manager failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocatorManager"))
ErrLoadKeyspaceGroupsTimeout = errors.Normalize("load keyspace groups timeout", errors.RFCCodeText("ErrLoadKeyspaceGroupsTimeout"))
ErrLoadKeyspaceGroupsTerminated = errors.Normalize("load keyspace groups terminated", errors.RFCCodeText("ErrLoadKeyspaceGroupsTerminated"))
ErrLoadKeyspaceGroupsRetryExhausted = errors.Normalize("load keyspace groups retry exhausted, %s", errors.RFCCodeText("ErrLoadKeyspaceGroupsRetryExhausted"))
ErrLoadKeyspaceGroupsTimeout = errors.Normalize("load keyspace groups timeout", errors.RFCCodeText("PD:tso:ErrLoadKeyspaceGroupsTimeout"))
ErrLoadKeyspaceGroupsTerminated = errors.Normalize("load keyspace groups terminated", errors.RFCCodeText("PD:tso:ErrLoadKeyspaceGroupsTerminated"))
ErrLoadKeyspaceGroupsRetryExhausted = errors.Normalize("load keyspace groups retry exhausted, %s", errors.RFCCodeText("PD:tso:ErrLoadKeyspaceGroupsRetryExhausted"))
ErrKeyspaceGroupNotInitialized = errors.Normalize("the keyspace group %d isn't initialized", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupNotInitialized"))
ErrKeyspaceNotAssigned = errors.Normalize("the keyspace %d isn't assigned to any keyspace group", errors.RFCCodeText("PD:tso:ErrKeyspaceNotAssigned"))
)

// member errors
var (
ErrEtcdLeaderNotFound = errors.Normalize("etcd leader not found", errors.RFCCodeText("PD:member:ErrEtcdLeaderNotFound"))
ErrMarshalLeader = errors.Normalize("marshal leader failed", errors.RFCCodeText("PD:member:ErrMarshalLeader"))
ErrPreCheckCampaign = errors.Normalize("pre-check campaign failed", errors.RFCCodeText("PD:member:ErrPreCheckCampaign"))
)

// core errors
Expand Down
110 changes: 60 additions & 50 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (m *GroupManager) DeleteKeyspaceGroupByID(id uint32) (*endpoint.KeyspaceGro
if kg == nil {
return nil
}
if kg.InSplit {
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
return m.store.DeleteKeyspaceGroup(txn, id)
Expand Down Expand Up @@ -176,17 +176,24 @@ func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGro
if oldKG != nil && !overwrite {
return ErrKeyspaceGroupExists
}
if oldKG != nil && oldKG.InSplit && overwrite {
if oldKG.IsSplitting() && overwrite {
return ErrKeyspaceGroupInSplit
}
m.store.SaveKeyspaceGroup(txn, &endpoint.KeyspaceGroup{
newKG := &endpoint.KeyspaceGroup{
ID: keyspaceGroup.ID,
UserKind: keyspaceGroup.UserKind,
Members: keyspaceGroup.Members,
Keyspaces: keyspaceGroup.Keyspaces,
InSplit: keyspaceGroup.InSplit,
SplitFrom: keyspaceGroup.SplitFrom,
})
}
if oldKG.IsSplitting() {
newKG.SplitState = &endpoint.SplitState{
SplitSource: oldKG.SplitState.SplitSource,
}
}
err = m.store.SaveKeyspaceGroup(txn, newKG)
if err != nil {
return err
}
}
return nil
})
Expand Down Expand Up @@ -230,7 +237,7 @@ func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupI
if kg == nil {
return errors.Errorf("keyspace group %d not found", id)
}
if kg.InSplit {
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
switch mutation {
Expand Down Expand Up @@ -276,7 +283,7 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse
if newKG == nil {
return errors.Errorf("keyspace group %s not found in %s group", newGroupID, newUserKind)
}
if oldKG.InSplit || newKG.InSplit {
if oldKG.IsSplitting() || newKG.IsSplitting() {
return ErrKeyspaceGroupInSplit
}

Expand Down Expand Up @@ -308,40 +315,41 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse

// SplitKeyspaceGroupByID splits the keyspace group by ID into a new keyspace group with the given new ID.
// And the keyspaces in the old keyspace group will be moved to the new keyspace group.
func (m *GroupManager) SplitKeyspaceGroupByID(splitFromID, splitToID uint32, keyspaces []uint32) error {
var splitFromKg, splitToKg *endpoint.KeyspaceGroup
func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint32, keyspaces []uint32) error {
var splitSourceKg, splitTargetKg *endpoint.KeyspaceGroup
m.Lock()
defer m.Unlock()
// TODO: avoid to split when the keyspaces is empty.
if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
// Load the old keyspace group first.
splitFromKg, err = m.store.LoadKeyspaceGroup(txn, splitFromID)
splitSourceKg, err = m.store.LoadKeyspaceGroup(txn, splitSourceID)
if err != nil {
return err
}
if splitFromKg == nil {
if splitSourceKg == nil {
return ErrKeyspaceGroupNotFound
}
if splitFromKg.InSplit {
// A keyspace group can not take part in multiple split processes.
if splitSourceKg.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
// Check if the new keyspace group already exists.
splitToKg, err = m.store.LoadKeyspaceGroup(txn, splitToID)
splitTargetKg, err = m.store.LoadKeyspaceGroup(txn, splitTargetID)
if err != nil {
return err
}
if splitToKg != nil {
if splitTargetKg != nil {
return ErrKeyspaceGroupExists
}
// Check if the keyspaces are all in the old keyspace group.
if len(keyspaces) > len(splitFromKg.Keyspaces) {
if len(keyspaces) > len(splitSourceKg.Keyspaces) {
return ErrKeyspaceNotInKeyspaceGroup
}
var (
oldKeyspaceMap = make(map[uint32]struct{}, len(splitFromKg.Keyspaces))
oldKeyspaceMap = make(map[uint32]struct{}, len(splitSourceKg.Keyspaces))
newKeyspaceMap = make(map[uint32]struct{}, len(keyspaces))
)
for _, keyspace := range splitFromKg.Keyspaces {
for _, keyspace := range splitSourceKg.Keyspaces {
oldKeyspaceMap[keyspace] = struct{}{}
}
for _, keyspace := range keyspaces {
Expand All @@ -351,75 +359,77 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitFromID, splitToID uint32, key
newKeyspaceMap[keyspace] = struct{}{}
}
// Get the split keyspace group for the old keyspace group.
splitKeyspaces := make([]uint32, 0, len(splitFromKg.Keyspaces)-len(keyspaces))
for _, keyspace := range splitFromKg.Keyspaces {
splitKeyspaces := make([]uint32, 0, len(splitSourceKg.Keyspaces)-len(keyspaces))
for _, keyspace := range splitSourceKg.Keyspaces {
if _, ok := newKeyspaceMap[keyspace]; !ok {
splitKeyspaces = append(splitKeyspaces, keyspace)
}
}
// Update the old keyspace group.
splitFromKg.Keyspaces = splitKeyspaces
splitFromKg.InSplit = true
if err = m.store.SaveKeyspaceGroup(txn, splitFromKg); err != nil {
splitSourceKg.Keyspaces = splitKeyspaces
splitSourceKg.SplitState = &endpoint.SplitState{
SplitSource: splitSourceKg.ID,
}
if err = m.store.SaveKeyspaceGroup(txn, splitSourceKg); err != nil {
return err
}
splitToKg = &endpoint.KeyspaceGroup{
ID: splitToID,
splitTargetKg = &endpoint.KeyspaceGroup{
ID: splitTargetID,
// Keep the same user kind and members as the old keyspace group.
UserKind: splitFromKg.UserKind,
Members: splitFromKg.Members,
UserKind: splitSourceKg.UserKind,
Members: splitSourceKg.Members,
Keyspaces: keyspaces,
// Only set the new keyspace group in split state.
InSplit: true,
SplitFrom: splitFromKg.ID,
SplitState: &endpoint.SplitState{
SplitSource: splitSourceKg.ID,
},
}
// Create the new split keyspace group.
return m.store.SaveKeyspaceGroup(txn, splitToKg)
return m.store.SaveKeyspaceGroup(txn, splitTargetKg)
}); err != nil {
return err
}
// Update the keyspace group cache.
m.groups[endpoint.StringUserKind(splitFromKg.UserKind)].Put(splitFromKg)
m.groups[endpoint.StringUserKind(splitToKg.UserKind)].Put(splitToKg)
m.groups[endpoint.StringUserKind(splitSourceKg.UserKind)].Put(splitSourceKg)
m.groups[endpoint.StringUserKind(splitTargetKg.UserKind)].Put(splitTargetKg)
return nil
}

// FinishSplitKeyspaceByID finishes the split keyspace group by the split-to ID.
func (m *GroupManager) FinishSplitKeyspaceByID(splitToID uint32) error {
var splitToKg, splitFromKg *endpoint.KeyspaceGroup
// FinishSplitKeyspaceByID finishes the split keyspace group by the split target ID.
func (m *GroupManager) FinishSplitKeyspaceByID(splitTargetID uint32) error {
var splitTargetKg, splitSourceKg *endpoint.KeyspaceGroup
m.Lock()
defer m.Unlock()
if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
// Load the split-to keyspace group first.
splitToKg, err = m.store.LoadKeyspaceGroup(txn, splitToID)
// Load the split target keyspace group first.
splitTargetKg, err = m.store.LoadKeyspaceGroup(txn, splitTargetID)
if err != nil {
return err
}
if splitToKg == nil {
if splitTargetKg == nil {
return ErrKeyspaceGroupNotFound
}
// Check if it's in the split state.
if !splitToKg.InSplit {
if !splitTargetKg.IsSplitTarget() {
return ErrKeyspaceGroupNotInSplit
}
// Load the split-from keyspace group then.
splitFromKg, err = m.store.LoadKeyspaceGroup(txn, splitToKg.SplitFrom)
// Load the split source keyspace group then.
splitSourceKg, err = m.store.LoadKeyspaceGroup(txn, splitTargetKg.SplitSource())
if err != nil {
return err
}
if splitFromKg == nil {
if splitSourceKg == nil {
return ErrKeyspaceGroupNotFound
}
if !splitFromKg.InSplit {
if !splitSourceKg.IsSplitSource() {
return ErrKeyspaceGroupNotInSplit
}
splitToKg.InSplit = false
splitFromKg.InSplit = false
err = m.store.SaveKeyspaceGroup(txn, splitToKg)
splitTargetKg.SplitState = nil
splitSourceKg.SplitState = nil
err = m.store.SaveKeyspaceGroup(txn, splitTargetKg)
if err != nil {
return err
}
err = m.store.SaveKeyspaceGroup(txn, splitFromKg)
err = m.store.SaveKeyspaceGroup(txn, splitSourceKg)
if err != nil {
return err
}
Expand All @@ -428,7 +438,7 @@ func (m *GroupManager) FinishSplitKeyspaceByID(splitToID uint32) error {
return err
}
// Update the keyspace group cache.
m.groups[endpoint.StringUserKind(splitToKg.UserKind)].Put(splitToKg)
m.groups[endpoint.StringUserKind(splitFromKg.UserKind)].Put(splitFromKg)
m.groups[endpoint.StringUserKind(splitTargetKg.UserKind)].Put(splitTargetKg)
m.groups[endpoint.StringUserKind(splitSourceKg.UserKind)].Put(splitSourceKg)
return nil
}
18 changes: 8 additions & 10 deletions pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupOperations() {
re.NoError(err)
re.Equal(uint32(0), kg.ID)
re.Equal(endpoint.Basic.String(), kg.UserKind)
re.False(kg.InSplit)
re.False(kg.IsSplitting())
kg, err = suite.kgm.GetKeyspaceGroupByID(3)
re.NoError(err)
re.Equal(uint32(3), kg.ID)
re.Equal(endpoint.Standard.String(), kg.UserKind)
re.False(kg.InSplit)
re.False(kg.IsSplitting())
// remove the keyspace group 3
kg, err = suite.kgm.DeleteKeyspaceGroupByID(3)
re.NoError(err)
Expand Down Expand Up @@ -253,14 +253,14 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() {
re.NoError(err)
re.Equal(uint32(2), kg2.ID)
re.Equal([]uint32{111, 222}, kg2.Keyspaces)
re.True(kg2.InSplit)
re.Empty(kg2.SplitFrom)
re.True(kg2.IsSplitSource())
re.Equal(kg2.ID, kg2.SplitSource())
kg4, err := suite.kgm.GetKeyspaceGroupByID(4)
re.NoError(err)
re.Equal(uint32(4), kg4.ID)
re.Equal([]uint32{333}, kg4.Keyspaces)
re.True(kg4.InSplit)
re.Equal(kg2.ID, kg4.SplitFrom)
re.True(kg4.IsSplitTarget())
re.Equal(kg2.ID, kg4.SplitSource())
re.Equal(kg2.UserKind, kg4.UserKind)
re.Equal(kg2.Members, kg4.Members)

Expand Down Expand Up @@ -293,14 +293,12 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() {
re.NoError(err)
re.Equal(uint32(2), kg2.ID)
re.Equal([]uint32{111, 222}, kg2.Keyspaces)
re.False(kg2.InSplit)
re.Empty(kg2.SplitFrom)
re.False(kg2.IsSplitting())
kg4, err = suite.kgm.GetKeyspaceGroupByID(4)
re.NoError(err)
re.Equal(uint32(4), kg4.ID)
re.Equal([]uint32{333}, kg4.Keyspaces)
re.False(kg4.InSplit)
re.Equal(kg2.ID, kg4.SplitFrom)
re.False(kg4.IsSplitting())
re.Equal(kg2.UserKind, kg4.UserKind)
re.Equal(kg2.Members, kg4.Members)

Expand Down
13 changes: 7 additions & 6 deletions pkg/mcs/tso/server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package server
import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/tso"
"go.uber.org/zap"
)
Expand All @@ -31,14 +30,16 @@ func newHandler(s *Server) *Handler {
return &Handler{s: s}
}

// ResetTS resets the ts with specified tso.
// TODO: Support multiple keyspace groups.
func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool) error {
// ResetTS resets the TSO with the specified one.
func (h *Handler) ResetTS(
ts uint64, ignoreSmaller, skipUpperBoundCheck bool, keyspaceGroupID uint32,
) error {
log.Info("reset-ts",
zap.Uint64("new-ts", ts),
zap.Bool("ignore-smaller", ignoreSmaller),
zap.Bool("skip-upper-bound-check", skipUpperBoundCheck))
tsoAllocatorManager, err := h.s.GetTSOAllocatorManager(mcsutils.DefaultKeyspaceGroupID)
zap.Bool("skip-upper-bound-check", skipUpperBoundCheck),
zap.Uint32("keyspace-group-id", keyspaceGroupID))
tsoAllocatorManager, err := h.s.GetTSOAllocatorManager(keyspaceGroupID)
if err != nil {
log.Error("failed to get allocator manager", errs.ZapError(err))
return err
Expand Down
Loading