-
Notifications
You must be signed in to change notification settings - Fork 721
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bootstrap default keyspace group in the tso service #6306
Changes from all commits
18fae33
0624827
71ccdda
c785844
e88e809
a6fc397
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -124,7 +124,15 @@ func (s *state) getAMWithMembershipCheck( | |
if kgid, ok := s.keyspaceLookupTable[keyspaceID]; ok { | ||
return nil, kgid, genNotServedErr(errs.ErrGetAllocatorManager, keyspaceGroupID) | ||
} | ||
return nil, keyspaceGroupID, errs.ErrKeyspaceNotAssigned.FastGenByArgs(keyspaceID) | ||
|
||
if keyspaceGroupID != mcsutils.DefaultKeyspaceGroupID { | ||
return nil, keyspaceGroupID, errs.ErrKeyspaceNotAssigned.FastGenByArgs(keyspaceID) | ||
} | ||
|
||
// The keyspace doesn't belong to any keyspace group, so return the default keyspace group. | ||
// It's for migrating the existing keyspaces which have no keyspace group assigned, so the | ||
// the default keyspace group is used to serve the keyspaces. | ||
return s.ams[mcsutils.DefaultKeyspaceGroupID], mcsutils.DefaultKeyspaceGroupID, nil | ||
} | ||
|
||
// KeyspaceGroupManager manages the members of the keyspace groups assigned to this host. | ||
|
@@ -222,23 +230,12 @@ func NewKeyspaceGroupManager( | |
} | ||
|
||
// Initialize this KeyspaceGroupManager | ||
func (kgm *KeyspaceGroupManager) Initialize(loadFromStorage bool) error { | ||
// Initialize the default keyspace group if not loading from storage | ||
if !loadFromStorage { | ||
group := &endpoint.KeyspaceGroup{ | ||
ID: mcsutils.DefaultKeySpaceGroupID, | ||
Members: []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}}, | ||
Keyspaces: []uint32{mcsutils.DefaultKeyspaceID}, | ||
} | ||
kgm.updateKeyspaceGroup(group) | ||
return nil | ||
} | ||
|
||
func (kgm *KeyspaceGroupManager) Initialize() error { | ||
// Load the initial keyspace group assignment from storage with time limit | ||
done := make(chan struct{}, 1) | ||
ctx, cancel := context.WithCancel(kgm.ctx) | ||
go kgm.checkInitProgress(ctx, cancel, done) | ||
watchStartRevision, err := kgm.initAssignment(ctx) | ||
watchStartRevision, defaultKGConfigured, err := kgm.initAssignment(ctx) | ||
done <- struct{}{} | ||
if err != nil { | ||
log.Error("failed to initialize keyspace group manager", errs.ZapError(err)) | ||
|
@@ -247,6 +244,12 @@ func (kgm *KeyspaceGroupManager) Initialize(loadFromStorage bool) error { | |
return err | ||
} | ||
|
||
// Initialize the default keyspace group if it isn't configured in the storage. | ||
if !defaultKGConfigured { | ||
keyspaces := []uint32{mcsutils.DefaultKeyspaceID} | ||
kgm.initDefaultKeysapceGroup(keyspaces) | ||
} | ||
|
||
// Watch/apply keyspace group membership/distribution meta changes dynamically. | ||
kgm.wg.Add(1) | ||
go kgm.startKeyspaceGroupsMetaWatchLoop(watchStartRevision) | ||
|
@@ -284,14 +287,26 @@ func (kgm *KeyspaceGroupManager) checkInitProgress(ctx context.Context, cancel c | |
<-done | ||
} | ||
|
||
func (kgm *KeyspaceGroupManager) initDefaultKeysapceGroup(keyspaces []uint32) { | ||
log.Info("initializing default keyspace group", | ||
zap.Int("keyspaces-length", len(keyspaces))) | ||
|
||
group := &endpoint.KeyspaceGroup{ | ||
ID: mcsutils.DefaultKeyspaceGroupID, | ||
Members: []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}}, | ||
Keyspaces: keyspaces, | ||
} | ||
kgm.updateKeyspaceGroup(group) | ||
} | ||
|
||
// initAssignment loads initial keyspace group assignment from storage and initialize the group manager. | ||
func (kgm *KeyspaceGroupManager) initAssignment(ctx context.Context) (int64, error) { | ||
// Return watchStartRevision, the start revision for watching keyspace group membership/distribution change. | ||
func (kgm *KeyspaceGroupManager) initAssignment( | ||
ctx context.Context, | ||
) (watchStartRevision int64, defaultKGConfigured bool, err error) { | ||
var ( | ||
// The start revision for watching keyspace group membership/distribution change | ||
watchStartRevision int64 | ||
groups []*endpoint.KeyspaceGroup | ||
more bool | ||
err error | ||
keyspaceGroupsLoaded uint32 | ||
revision int64 | ||
) | ||
|
@@ -300,7 +315,7 @@ func (kgm *KeyspaceGroupManager) initAssignment(ctx context.Context) (int64, err | |
for { | ||
revision, groups, more, err = kgm.loadKeyspaceGroups(ctx, keyspaceGroupsLoaded, kgm.loadKeyspaceGroupsBatchSize) | ||
if err != nil { | ||
return 0, err | ||
return | ||
} | ||
|
||
keyspaceGroupsLoaded += uint32(len(groups)) | ||
|
@@ -313,10 +328,15 @@ func (kgm *KeyspaceGroupManager) initAssignment(ctx context.Context) (int64, err | |
for _, group := range groups { | ||
select { | ||
case <-ctx.Done(): | ||
return watchStartRevision, errs.ErrLoadKeyspaceGroupsTerminated | ||
err = errs.ErrLoadKeyspaceGroupsTerminated | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can return nil here instead of an error. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can't return nil. Returning an error here indicates the caller KeyspaceGroupManager.Initialize(), the entire initialization process, should return immediately instead of continuing the other initialization work. Please check the logic after kgm.initAssignment(ctx) returns to see if there is a better way to do it.
|
||
return | ||
default: | ||
} | ||
|
||
if group.ID == mcsutils.DefaultKeyspaceGroupID { | ||
defaultKGConfigured = true | ||
} | ||
|
||
kgm.updateKeyspaceGroup(group) | ||
} | ||
|
||
|
@@ -326,7 +346,7 @@ func (kgm *KeyspaceGroupManager) initAssignment(ctx context.Context) (int64, err | |
} | ||
|
||
log.Info("loaded keyspace groups", zap.Uint32("keyspace-groups-loaded", keyspaceGroupsLoaded)) | ||
return watchStartRevision, nil | ||
return | ||
} | ||
|
||
// loadKeyspaceGroups loads keyspace groups from the start ID with limit. | ||
|
@@ -441,7 +461,7 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) ( | |
return revision, wresp.Err() | ||
} | ||
for _, event := range wresp.Events { | ||
id, err := endpoint.ExtractKeyspaceGroupIDFromPath(string(event.Kv.Key)) | ||
groupID, err := endpoint.ExtractKeyspaceGroupIDFromPath(string(event.Kv.Key)) | ||
if err != nil { | ||
log.Warn("failed to extract keyspace group ID from the key path", | ||
zap.String("key-path", string(event.Kv.Key)), zap.Error(err)) | ||
|
@@ -453,12 +473,20 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) ( | |
group := &endpoint.KeyspaceGroup{} | ||
if err := json.Unmarshal(event.Kv.Value, group); err != nil { | ||
log.Warn("failed to unmarshal keyspace group", | ||
zap.Uint32("keysapce-group-id", id), | ||
zap.Uint32("keysapce-group-id", groupID), | ||
zap.Error(errs.ErrJSONUnmarshal.Wrap(err).FastGenWithCause())) | ||
} | ||
kgm.updateKeyspaceGroup(group) | ||
case clientv3.EventTypeDelete: | ||
kgm.deleteKeyspaceGroup(id) | ||
if groupID == mcsutils.DefaultKeyspaceGroupID { | ||
keyspaces := kgm.kgs[groupID].Keyspaces | ||
kgm.deleteKeyspaceGroup(groupID) | ||
log.Warn("removed default keyspace group meta config from the storage. " + | ||
"now every tso node/pod will initialize it") | ||
kgm.initDefaultKeysapceGroup(keyspaces) | ||
} else { | ||
kgm.deleteKeyspaceGroup(groupID) | ||
} | ||
} | ||
} | ||
revision = wresp.Header.Revision | ||
|
@@ -473,6 +501,11 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) ( | |
} | ||
|
||
func (kgm *KeyspaceGroupManager) isAssignedToMe(group *endpoint.KeyspaceGroup) bool { | ||
// If the default keyspace group isn't assigned to any tso node/pod, assign it to everyone. | ||
if group.ID == mcsutils.DefaultKeyspaceGroupID && len(group.Members) == 0 { | ||
return true | ||
} | ||
|
||
for _, member := range group.Members { | ||
if member.Address == kgm.tsoServiceID.ServiceAddr { | ||
return true | ||
|
@@ -516,7 +549,7 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro | |
tsRootPath string | ||
storage *endpoint.StorageEndpoint | ||
) | ||
if group.ID == mcsutils.DefaultKeySpaceGroupID { | ||
if group.ID == mcsutils.DefaultKeyspaceGroupID { | ||
tsRootPath = kgm.legacySvcRootPath | ||
storage = kgm.legacySvcStorage | ||
} else { | ||
|
@@ -536,6 +569,10 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro | |
kgm.ams[group.ID] = am | ||
kgm.Unlock() | ||
} else { | ||
if group.ID == mcsutils.DefaultKeyspaceGroupID { | ||
log.Info("resign default keyspace group membership", | ||
zap.Any("default-keyspace-group", group)) | ||
} | ||
// Not assigned to me. If this host/pod owns this keyspace group, it should resign. | ||
kgm.deleteKeyspaceGroup(group.ID) | ||
} | ||
|
@@ -554,7 +591,7 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership( | |
return newKeyspaces[i] < newKeyspaces[j] | ||
}) | ||
|
||
// Mostly, the membership has no change, so we optimize for this case. | ||
// Mostly, the membership has no change, so optimize for this case. | ||
sameMembership := true | ||
if oldLen != newLen { | ||
sameMembership = false | ||
|
@@ -571,10 +608,10 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership( | |
defer kgm.Unlock() | ||
|
||
if sameMembership { | ||
// The keyspace group membership is not changed, so we reuse the old one. | ||
// The keyspace group membership is not changed. Reuse the old one. | ||
newGroup.KeyspaceLookupTable = oldGroup.KeyspaceLookupTable | ||
} else { | ||
// The keyspace group membership is changed, so we update the keyspace lookup table. | ||
// The keyspace group membership is changed. Update the keyspace lookup table. | ||
newGroup.KeyspaceLookupTable = make(map[uint32]struct{}) | ||
for i, j := 0, 0; i < oldLen || j < newLen; { | ||
if i < oldLen && j < newLen && oldKeyspaces[i] == newKeyspaces[j] { | ||
|
@@ -590,12 +627,31 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership( | |
j++ | ||
} | ||
} | ||
if groupID == mcsutils.DefaultKeyspaceGroupID { | ||
if _, ok := newGroup.KeyspaceLookupTable[mcsutils.DefaultKeyspaceID]; !ok { | ||
log.Warn("default keyspace is not in default keyspace group. add it back") | ||
kgm.keyspaceLookupTable[mcsutils.DefaultKeyspaceID] = groupID | ||
newGroup.KeyspaceLookupTable[mcsutils.DefaultKeyspaceID] = struct{}{} | ||
newGroup.Keyspaces = make([]uint32, 1+len(newKeyspaces)) | ||
newGroup.Keyspaces[0] = mcsutils.DefaultKeyspaceID | ||
copy(newGroup.Keyspaces[1:], newKeyspaces) | ||
} | ||
} else { | ||
if _, ok := newGroup.KeyspaceLookupTable[mcsutils.DefaultKeyspaceID]; ok { | ||
log.Warn("default keyspace is in non-default keyspace group. remove it") | ||
kgm.keyspaceLookupTable[mcsutils.DefaultKeyspaceID] = mcsutils.DefaultKeyspaceGroupID | ||
delete(newGroup.KeyspaceLookupTable, mcsutils.DefaultKeyspaceID) | ||
newGroup.Keyspaces = newKeyspaces[1:] | ||
} | ||
} | ||
} | ||
kgm.kgs[groupID] = newGroup | ||
} | ||
|
||
// deleteKeyspaceGroup deletes the given keyspace group. | ||
func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(groupID uint32) { | ||
log.Info("delete keyspace group", zap.Uint32("keyspace-group-id", groupID)) | ||
|
||
kgm.Lock() | ||
defer kgm.Unlock() | ||
|
||
|
@@ -618,8 +674,6 @@ func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(groupID uint32) { | |
am.close() | ||
kgm.ams[groupID] = nil | ||
} | ||
|
||
log.Info("deleted keyspace group", zap.Uint32("keyspace-group-id", groupID)) | ||
} | ||
|
||
// GetAllocatorManager returns the AllocatorManager of the given keyspace group | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete line 74 ~ 79 for two reasons:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rleungx ^
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me know if I missed anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right.