Skip to content

Commit

Permalink
Merge branch 'master' into add_split_from
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Apr 12, 2023
2 parents 972fa65 + f6a4090 commit f35f3da
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 136 deletions.
6 changes: 3 additions & 3 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ type AllocatorManager struct {

ctx context.Context
cancel context.CancelFunc
// ksgID is the keyspace group id
ksgID uint32
// kgID is the keyspace group ID
kgID uint32
// member is for election use
member ElectionMember
// TSO config
Expand Down Expand Up @@ -204,7 +204,7 @@ func NewAllocatorManager(
am := &AllocatorManager{
ctx: ctx,
cancel: cancel,
ksgID: keyspaceGroupID,
kgID: keyspaceGroupID,
member: member,
rootPath: rootPath,
storage: storage,
Expand Down
223 changes: 125 additions & 98 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"

perrors "github.com/pingcap/errors"
Expand Down Expand Up @@ -55,20 +54,83 @@ const (
watchKEtcdChangeRetryInterval = 1 * time.Second
)

// KeyspaceGroupManager manages the members of the keyspace groups assigned to this host.
// The replicas campaign for the leaders which provide the tso service for the corresponding
// keyspace groups.
type KeyspaceGroupManager struct {
type state struct {
sync.RWMutex
// ams stores the allocator managers of the keyspace groups. Each keyspace group is
// assigned with an allocator manager managing its global/local tso allocators.
// Use a fixed size array to maximize the efficiency of concurrent access to
// different keyspace groups for tso service.
ams [mcsutils.MaxKeyspaceGroupCountInUse]atomic.Pointer[AllocatorManager]
ams [mcsutils.MaxKeyspaceGroupCountInUse]*AllocatorManager
// kgs stores the keyspace groups' membership/distribution meta.
kgs [mcsutils.MaxKeyspaceGroupCountInUse]atomic.Pointer[endpoint.KeyspaceGroup]
// keyspaceLookupTable is a map from keyspace (id) to its keyspace group (id).
// stored as map[uint32]uint32
keyspaceLookupTable sync.Map
kgs [mcsutils.MaxKeyspaceGroupCountInUse]*endpoint.KeyspaceGroup
// keyspaceLookupTable is a map from keyspace to the keyspace group to which it belongs.
keyspaceLookupTable map[uint32]uint32
}

func (s *state) initialize() {
s.keyspaceLookupTable = make(map[uint32]uint32)
}

func (s *state) deinitialize() {
log.Info("closing all keyspace groups")

s.Lock()
defer s.Unlock()

wg := sync.WaitGroup{}
for _, am := range s.ams {
if am != nil {
wg.Add(1)
go func(am *AllocatorManager) {
defer wg.Done()
am.close()
log.Info("keyspace group closed", zap.Uint32("keyspace-group-id", am.kgID))
}(am)
}
}
wg.Wait()

log.Info("all keyspace groups closed")
}

// getAllocatorManager returns the AllocatorManager of the given keyspace group
func (s *state) getAllocatorManager(group uint32) *AllocatorManager {
s.RLock()
defer s.RUnlock()
return s.ams[group]
}

// getAMWithMembershipCheck returns the AllocatorManager of the given keyspace group and check
// if the keyspace is served by this keyspace group.
func (s *state) getAMWithMembershipCheck(
keyspaceID, keyspaceGroupID uint32,
) (*AllocatorManager, uint32, error) {
s.RLock()
defer s.RUnlock()

if am := s.ams[keyspaceGroupID]; am != nil {
kg := s.kgs[keyspaceGroupID]
if kg != nil {
if _, ok := kg.KeyspaceLookupTable[keyspaceID]; ok {
return am, keyspaceGroupID, nil
}
}
}

// The keyspace doesn't belong to this keyspace group, we should check if it belongs to any other
// keyspace groups, and return the correct keyspace group ID to the client.
if kgid, ok := s.keyspaceLookupTable[keyspaceID]; ok {
return nil, kgid, genNotServedErr(errs.ErrGetAllocatorManager, keyspaceGroupID)
}
return nil, keyspaceGroupID, errs.ErrKeyspaceNotAssigned.FastGenByArgs(keyspaceID)
}

// KeyspaceGroupManager manages the members of the keyspace groups assigned to this host.
// The replicas campaign for the leaders which provide the tso service for the corresponding
// keyspace groups.
type KeyspaceGroupManager struct {
// state is the in-memory state of the keyspace groups
state

ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -149,11 +211,11 @@ func NewKeyspaceGroupManager(
loadKeyspaceGroupsBatchSize: defaultLoadKeyspaceGroupsBatchSize,
loadFromEtcdMaxRetryTimes: defaultLoadFromEtcdMaxRetryTimes,
}

kgm.legacySvcStorage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(kgm.etcdClient, kgm.legacySvcRootPath), nil)
kgm.tsoSvcStorage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(kgm.etcdClient, kgm.tsoSvcRootPath), nil)
kgm.state.initialize()
return kgm
}

Expand Down Expand Up @@ -201,30 +263,11 @@ func (kgm *KeyspaceGroupManager) Close() {
// added/initialized after that.
kgm.cancel()
kgm.wg.Wait()
kgm.closeKeyspaceGroups()
kgm.state.deinitialize()

log.Info("keyspace group manager closed")
}

func (kgm *KeyspaceGroupManager) closeKeyspaceGroups() {
log.Info("closing all keyspace groups")

wg := sync.WaitGroup{}
for i := range kgm.ams {
if am := kgm.ams[i].Load(); am != nil {
wg.Add(1)
go func(am *AllocatorManager) {
defer wg.Done()
am.close()
log.Info("keyspace group closed", zap.Uint32("keyspace-group-id", am.ksgID))
}(am)
}
}
wg.Wait()

log.Info("All keyspace groups closed")
}

func (kgm *KeyspaceGroupManager) checkInitProgress(ctx context.Context, cancel context.CancelFunc, done chan struct{}) {
select {
case <-done:
Expand Down Expand Up @@ -439,22 +482,19 @@ func (kgm *KeyspaceGroupManager) isAssignedToMe(group *endpoint.KeyspaceGroup) b
// updateKeyspaceGroup applies the given keyspace group. If the keyspace group is just assigned to
// this host/pod, it will join the primary election.
func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGroup) {
if group.ID >= uint32(len(kgm.ams)) {
log.Warn("keyspace group ID is out of range, ignore it",
zap.Uint32("keyspace-group-id", group.ID), zap.Int("max-keyspace-group-id", len(kgm.ams)-1))
if err := kgm.checkKeySpaceGroupID(group.ID); err != nil {
log.Warn("keyspace group ID is invalid, ignore it", zap.Error(err))
return
}

assignedToMe := kgm.isAssignedToMe(group)
if assignedToMe {
if kgm.ams[group.ID].Load() != nil {
if kgm.ams[group.ID] != nil {
log.Info("keyspace group already initialized, so update meta only",
zap.Uint32("keyspace-group-id", group.ID))

oldGroup := kgm.kgs[group.ID].Load()
group.KeyspaceLookupTable = kgm.updateKeyspaceGroupMembership(
group.ID, oldGroup.Keyspaces, group.Keyspaces, oldGroup.KeyspaceLookupTable)
kgm.kgs[group.ID].Store(group)
oldGroup := kgm.kgs[group.ID]
kgm.updateKeyspaceGroupMembership(oldGroup, group)
return
}

Expand Down Expand Up @@ -484,9 +524,17 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
storage = kgm.tsoSvcStorage
}

group.KeyspaceLookupTable = kgm.buildKeyspaceLookupTable(group.ID, group.Keyspaces)
kgm.kgs[group.ID].Store(group)
kgm.ams[group.ID].Store(NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg, true))
am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg, true)

kgm.Lock()
group.KeyspaceLookupTable = make(map[uint32]struct{})
for _, kid := range group.Keyspaces {
group.KeyspaceLookupTable[kid] = struct{}{}
kgm.keyspaceLookupTable[kid] = group.ID
}
kgm.kgs[group.ID] = group
kgm.ams[group.ID] = am
kgm.Unlock()
} else {
// Not assigned to me. If this host/pod owns this keyspace group, it should resign.
kgm.deleteKeyspaceGroup(group.ID)
Expand All @@ -495,11 +543,11 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro

// updateKeyspaceGroupMembership updates the keyspace lookup table for the given keyspace group.
func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership(
groupID uint32, oldKeyspaces, newKeyspaces []uint32,
oldKeyspaceLookupTable map[uint32]struct{},
) map[uint32]struct{} {
oldLen := len(oldKeyspaces)
newLen := len(newKeyspaces)
oldGroup, newGroup *endpoint.KeyspaceGroup,
) {
groupID := newGroup.ID
oldKeyspaces, newKeyspaces := oldGroup.Keyspaces, newGroup.Keyspaces
oldLen, newLen := len(oldKeyspaces), len(newKeyspaces)

// Sort the keyspaces in ascending order
sort.Slice(newKeyspaces, func(i, j int) bool {
Expand All @@ -519,53 +567,56 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership(
}
}

var newKeyspaceLookupTable map[uint32]struct{}
kgm.Lock()
defer kgm.Unlock()

if sameMembership {
// The keyspace group membership is not changed, so we reuse the old one.
newKeyspaceLookupTable = oldKeyspaceLookupTable
newGroup.KeyspaceLookupTable = oldGroup.KeyspaceLookupTable
} else {
// The keyspace group membership is changed, so we update the keyspace lookup table.
newKeyspaceLookupTable = make(map[uint32]struct{})
newGroup.KeyspaceLookupTable = make(map[uint32]struct{})
for i, j := 0, 0; i < oldLen || j < newLen; {
if i < oldLen && j < newLen && oldKeyspaces[i] == newKeyspaces[j] {
newKeyspaceLookupTable[newKeyspaces[j]] = struct{}{}
newGroup.KeyspaceLookupTable[newKeyspaces[j]] = struct{}{}
i++
j++
} else if i < oldLen && j < newLen && oldKeyspaces[i] < newKeyspaces[j] || j == newLen {
kgm.keyspaceLookupTable.Delete(oldKeyspaces[i])
delete(kgm.keyspaceLookupTable, oldKeyspaces[i])
i++
} else {
newKeyspaceLookupTable[newKeyspaces[j]] = struct{}{}
kgm.keyspaceLookupTable.Store(newKeyspaces[j], groupID)
newGroup.KeyspaceLookupTable[newKeyspaces[j]] = struct{}{}
kgm.keyspaceLookupTable[newKeyspaces[j]] = groupID
j++
}
}
}

return newKeyspaceLookupTable
}

func (kgm *KeyspaceGroupManager) buildKeyspaceLookupTable(groupID uint32, keyspaces []uint32) map[uint32]struct{} {
keyspaceLookupTable := make(map[uint32]struct{})
for _, kid := range keyspaces {
keyspaceLookupTable[kid] = struct{}{}
kgm.keyspaceLookupTable.Store(kid, groupID)
}
return keyspaceLookupTable
kgm.kgs[groupID] = newGroup
}

// deleteKeyspaceGroup deletes the given keyspace group.
func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(groupID uint32) {
kg := kgm.kgs[groupID].Swap(nil)
kgm.Lock()
defer kgm.Unlock()

kg := kgm.kgs[groupID]
if kg != nil {
for _, kid := range kg.Keyspaces {
kgm.keyspaceLookupTable.CompareAndDelete(kid, kg.ID)
// if kid == kg.ID, it means the keyspace still belongs to this keyspace group,
// so we decouple the relationship in the global keyspace lookup table.
// if kid != kg.ID, it means the keyspace has been moved to another keyspace group
// which has already declared the ownership of the keyspace.
if kid == kg.ID {
delete(kgm.keyspaceLookupTable, kid)
}
}
kgm.kgs[groupID] = nil
}
am := kgm.ams[groupID].Swap(nil)

am := kgm.ams[groupID]
if am != nil {
am.close()
kgm.ams[groupID] = nil
}

log.Info("deleted keyspace group", zap.Uint32("keyspace-group-id", groupID))
Expand All @@ -576,28 +627,10 @@ func (kgm *KeyspaceGroupManager) GetAllocatorManager(keyspaceGroupID uint32) (*A
if err := kgm.checkKeySpaceGroupID(keyspaceGroupID); err != nil {
return nil, err
}
if am := kgm.ams[keyspaceGroupID].Load(); am != nil {
if am := kgm.state.getAllocatorManager(keyspaceGroupID); am != nil {
return am, nil
}
return nil, kgm.genNotServedErr(errs.ErrGetAllocatorManager, keyspaceGroupID)
}

// GetAMWithMembershipCheck returns the AllocatorManager of the given keyspace group and check if the keyspace
// is served by this keyspace group.
func (kgm *KeyspaceGroupManager) GetAMWithMembershipCheck(
keyspaceID, keyspaceGroupID uint32,
) (*AllocatorManager, error) {
if am := kgm.ams[keyspaceGroupID].Load(); am != nil {
ksg := kgm.kgs[keyspaceGroupID].Load()
if ksg == nil {
return nil, kgm.genNotServedErr(errs.ErrGetAllocatorManager, keyspaceGroupID)
}
if _, ok := ksg.KeyspaceLookupTable[keyspaceID]; !ok {
return nil, kgm.genNotServedErr(errs.ErrGetAllocatorManager, keyspaceGroupID)
}
return am, nil
}
return nil, kgm.genNotServedErr(errs.ErrGetAllocatorManager, keyspaceGroupID)
return nil, genNotServedErr(errs.ErrGetAllocatorManager, keyspaceGroupID)
}

// GetElectionMember returns the election member of the given keyspace group
Expand All @@ -607,7 +640,7 @@ func (kgm *KeyspaceGroupManager) GetElectionMember(
if err := kgm.checkKeySpaceGroupID(keyspaceGroupID); err != nil {
return nil, err
}
am, err := kgm.GetAMWithMembershipCheck(keyspaceID, keyspaceGroupID)
am, _, err := kgm.state.getAMWithMembershipCheck(keyspaceID, keyspaceGroupID)
if err != nil {
return nil, err
}
Expand All @@ -622,15 +655,9 @@ func (kgm *KeyspaceGroupManager) HandleTSORequest(
if err := kgm.checkKeySpaceGroupID(keyspaceGroupID); err != nil {
return pdpb.Timestamp{}, keyspaceGroupID, err
}
am, err := kgm.GetAMWithMembershipCheck(keyspaceID, keyspaceGroupID)
am, currentKeyspaceGroupID, err := kgm.state.getAMWithMembershipCheck(keyspaceID, keyspaceGroupID)
if err != nil {
// The keyspace doesn't belong to this keyspace group, we should check if it belongs to any other
// keyspace groups, and return the correct keyspace group ID to the client.
kgid, loaded := kgm.keyspaceLookupTable.Load(keyspaceID)
if loaded && kgid != nil {
return pdpb.Timestamp{}, kgid.(uint32), err
}
return pdpb.Timestamp{}, keyspaceGroupID, errs.ErrKeyspaceNotAssigned.FastGenByArgs(keyspaceID)
return pdpb.Timestamp{}, currentKeyspaceGroupID, err
}
ts, err = am.HandleRequest(dcLocation, count)
return ts, keyspaceGroupID, err
Expand All @@ -644,7 +671,7 @@ func (kgm *KeyspaceGroupManager) checkKeySpaceGroupID(id uint32) error {
fmt.Sprintf("%d shouldn't >= %d", id, mcsutils.MaxKeyspaceGroupCountInUse))
}

func (kgm *KeyspaceGroupManager) genNotServedErr(perr *perrors.Error, keyspaceGroupID uint32) error {
func genNotServedErr(perr *perrors.Error, keyspaceGroupID uint32) error {
return perr.FastGenByArgs(
fmt.Sprintf(
"requested keyspace group with id %d %s by this host/pod",
Expand Down
Loading

0 comments on commit f35f3da

Please sign in to comment.