Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keyspace: patrol the keyspace assignment in batch #6411

Merged
merged 2 commits into from
May 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 96 additions & 59 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const (
UserKindKey = "user_kind"
// TSOKeyspaceGroupIDKey is the key for tso keyspace group id in keyspace config.
TSOKeyspaceGroupIDKey = "tso_keyspace_group_id"
// keyspacePatrolBatchSize is the batch size for keyspace assignment patrol.
keyspacePatrolBatchSize = 256
)

// Config is the interface for keyspace config.
Expand All @@ -72,6 +74,8 @@ type Manager struct {
config Config
// kgm is the keyspace group manager of the server.
kgm *GroupManager
// nextPatrolStartID is the next start id of keyspace assignment patrol.
nextPatrolStartID uint32
}

// CreateKeyspaceRequest represents necessary arguments to create a keyspace.
Expand All @@ -94,13 +98,14 @@ func NewKeyspaceManager(
kgm *GroupManager,
) *Manager {
return &Manager{
metaLock: syncutil.NewLockGroup(syncutil.WithHash(keyspaceIDHash)),
idAllocator: idAllocator,
store: store,
cluster: cluster,
ctx: ctx,
config: config,
kgm: kgm,
metaLock: syncutil.NewLockGroup(syncutil.WithHash(keyspaceIDHash)),
idAllocator: idAllocator,
store: store,
cluster: cluster,
ctx: ctx,
config: config,
kgm: kgm,
nextPatrolStartID: utils.DefaultKeyspaceID,
}
}

Expand Down Expand Up @@ -570,64 +575,96 @@ func (manager *Manager) allocID() (uint32, error) {

// PatrolKeyspaceAssignment is used to patrol all keyspaces and assign them to the keyspace groups.
func (manager *Manager) PatrolKeyspaceAssignment() error {
// TODO: since the number of keyspaces might be large, we should consider to assign them in batches.
return manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error {
defaultKeyspaceGroup, err := manager.kgm.store.LoadKeyspaceGroup(txn, utils.DefaultKeyspaceGroupID)
if err != nil {
return err
}
if defaultKeyspaceGroup == nil {
return errors.Errorf("default keyspace group %d not found", utils.DefaultKeyspaceGroupID)
}
if defaultKeyspaceGroup.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
keyspaces, err := manager.store.LoadRangeKeyspace(txn, utils.DefaultKeyspaceID, 0)
if err != nil {
return err
}
var (
assigned = false
keyspaceIDsToUnlock = make([]uint32, 0, len(keyspaces))
)
defer func() {
for _, id := range keyspaceIDsToUnlock {
manager.metaLock.Unlock(id)
var (
// The current start ID of the patrol, used for logging.
currentStartID = manager.nextPatrolStartID
// The next start ID of the patrol, used for the next patrol.
nextStartID = currentStartID
moreToPatrol = true
err error
)
for moreToPatrol {
err = manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error {
defaultKeyspaceGroup, err := manager.kgm.store.LoadKeyspaceGroup(txn, utils.DefaultKeyspaceGroupID)
if err != nil {
return err
}
if defaultKeyspaceGroup == nil {
return errors.Errorf("default keyspace group %d not found", utils.DefaultKeyspaceGroupID)
}
if defaultKeyspaceGroup.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, keyspacePatrolBatchSize)
if err != nil {
return err
}
}()
for _, ks := range keyspaces {
if ks == nil {
continue
keyspaceNum := len(keyspaces)
// If there are more than one keyspace, update the current and next start IDs.
if keyspaceNum > 0 {
currentStartID = keyspaces[0].GetId()
nextStartID = keyspaces[keyspaceNum-1].GetId() + 1
}
manager.metaLock.Lock(ks.Id)
if ks.Config == nil {
ks.Config = make(map[string]string, 1)
} else {
// If the keyspace already has a group ID, skip it.
_, ok := ks.Config[TSOKeyspaceGroupIDKey]
if ok {
// If there are less than `keyspacePatrolBatchSize` keyspaces,
// we have reached the end of the keyspace list.
moreToPatrol = keyspaceNum == keyspacePatrolBatchSize
var (
assigned = false
keyspaceIDsToUnlock = make([]uint32, 0, keyspaceNum)
)
defer func() {
for _, id := range keyspaceIDsToUnlock {
manager.metaLock.Unlock(id)
}
}()
for _, ks := range keyspaces {
if ks == nil {
continue
}
manager.metaLock.Lock(ks.Id)
if ks.Config == nil {
ks.Config = make(map[string]string, 1)
} else if _, ok := ks.Config[TSOKeyspaceGroupIDKey]; ok {
// If the keyspace already has a group ID, skip it.
manager.metaLock.Unlock(ks.Id)
continue
}
// Unlock the keyspace meta lock after the whole txn.
keyspaceIDsToUnlock = append(keyspaceIDsToUnlock, ks.Id)
// If the keyspace doesn't have a group ID, assign it to the default keyspace group.
if !slice.Contains(defaultKeyspaceGroup.Keyspaces, ks.Id) {
defaultKeyspaceGroup.Keyspaces = append(defaultKeyspaceGroup.Keyspaces, ks.Id)
// Only save the keyspace group meta if any keyspace is assigned to it.
assigned = true
}
ks.Config[TSOKeyspaceGroupIDKey] = strconv.FormatUint(uint64(utils.DefaultKeyspaceGroupID), 10)
err = manager.store.SaveKeyspaceMeta(txn, ks)
if err != nil {
log.Error("[keyspace] failed to save keyspace meta during patrol",
zap.Int("batch-size", keyspacePatrolBatchSize),
zap.Uint32("current-start-id", currentStartID),
zap.Uint32("next-start-id", nextStartID),
zap.Uint32("keyspace-id", ks.Id), zap.Error(err))
return err
}
}
// Unlock the keyspace meta lock after the whole txn.
keyspaceIDsToUnlock = append(keyspaceIDsToUnlock, ks.Id)
// If the keyspace doesn't have a group ID, assign it to the default keyspace group.
if !slice.Contains(defaultKeyspaceGroup.Keyspaces, ks.Id) {
defaultKeyspaceGroup.Keyspaces = append(defaultKeyspaceGroup.Keyspaces, ks.Id)
assigned = true
}
ks.Config[TSOKeyspaceGroupIDKey] = strconv.FormatUint(uint64(utils.DefaultKeyspaceGroupID), 10)
err = manager.store.SaveKeyspaceMeta(txn, ks)
if err != nil {
log.Error("[keyspace] failed to save keyspace meta during patrol",
zap.Uint32("ID", ks.Id), zap.Error(err))
return err
if assigned {
err = manager.kgm.store.SaveKeyspaceGroup(txn, defaultKeyspaceGroup)
if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

Suggested change
if err != nil {
if err := manager.kgm.store.SaveKeyspaceGroup(txn, defaultKeyspaceGroup); err != nil

log.Error("[keyspace] failed to save default keyspace group meta during patrol",
zap.Int("batch-size", keyspacePatrolBatchSize),
zap.Uint32("current-start-id", currentStartID),
zap.Uint32("next-start-id", nextStartID), zap.Error(err))
return err
}
}
return nil
})
if err != nil {
return err
}
if assigned {
return manager.kgm.store.SaveKeyspaceGroup(txn, defaultKeyspaceGroup)
}
return nil
})
// If all keyspaces in the current batch are assigned, update the next start ID.
manager.nextPatrolStartID = nextStartID
}
return nil
}
33 changes: 33 additions & 0 deletions pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,3 +382,36 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignment() {
re.NotNil(defaultKeyspaceGroup)
re.Contains(defaultKeyspaceGroup.Keyspaces, uint32(111))
}

func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() {
re := suite.Require()
// Create some keyspaces without any keyspace group.
for i := 1; i < keyspacePatrolBatchSize*2+1; i++ {
now := time.Now().Unix()
err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{
Id: uint32(i),
Name: strconv.Itoa(i),
State: keyspacepb.KeyspaceState_ENABLED,
CreatedAt: now,
StateChangedAt: now,
})
re.NoError(err)
}
// Check if all the keyspaces are not attached to the default group.
defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < keyspacePatrolBatchSize*2+1; i++ {
re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
// Patrol the keyspace assignment.
err = suite.manager.PatrolKeyspaceAssignment()
re.NoError(err)
// Check if all the keyspaces are attached to the default group.
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < keyspacePatrolBatchSize*2+1; i++ {
re.Contains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
}