diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index 6fa4941b1784..adcb0be3106c 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -515,9 +515,9 @@ func (manager *Manager) UpdateKeyspaceState(name string, newState keyspacepb.Key // Changing the state of default keyspace is not allowed. if name == utils.DefaultKeyspaceName { log.Warn("[keyspace] failed to update keyspace config", - zap.Error(errModifyDefault), + zap.Error(ErrModifyDefaultKeyspace), ) - return nil, errModifyDefault + return nil, ErrModifyDefaultKeyspace } var meta *keyspacepb.KeyspaceMeta err := manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error { @@ -567,9 +567,9 @@ func (manager *Manager) UpdateKeyspaceStateByID(id uint32, newState keyspacepb.K // Changing the state of default keyspace is not allowed. if id == utils.DefaultKeyspaceID { log.Warn("[keyspace] failed to update keyspace config", - zap.Error(errModifyDefault), + zap.Error(ErrModifyDefaultKeyspace), ) - return nil, errModifyDefault + return nil, ErrModifyDefaultKeyspace } var meta *keyspacepb.KeyspaceMeta var err error diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index a1cc9a0e9b0f..ce9af8600398 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -536,6 +536,16 @@ func (m *GroupManager) SplitKeyspaceGroupByID( if splitSourceKg.IsMerging() { return ErrKeyspaceGroupInMerging(splitSourceID) } + // Build the new keyspace groups for split source and target. + var startKeyspaceID, endKeyspaceID uint32 + if len(keyspaceIDRange) >= 2 { + startKeyspaceID, endKeyspaceID = keyspaceIDRange[0], keyspaceIDRange[1] + } + splitSourceKeyspaces, splitTargetKeyspaces, err := buildSplitKeyspaces( + splitSourceKg.Keyspaces, keyspaces, startKeyspaceID, endKeyspaceID) + if err != nil { + return err + } // Check if the source keyspace group has enough replicas. if len(splitSourceKg.Members) < utils.DefaultKeyspaceGroupReplicaCount { return ErrKeyspaceGroupNotEnoughReplicas @@ -548,15 +558,6 @@ func (m *GroupManager) SplitKeyspaceGroupByID( if splitTargetKg != nil { return ErrKeyspaceGroupExists } - var startKeyspaceID, endKeyspaceID uint32 - if len(keyspaceIDRange) >= 2 { - startKeyspaceID, endKeyspaceID = keyspaceIDRange[0], keyspaceIDRange[1] - } - splitSourceKeyspaces, splitTargetKeyspaces, err := buildSplitKeyspaces( - splitSourceKg.Keyspaces, keyspaces, startKeyspaceID, endKeyspaceID) - if err != nil { - return err - } // Update the old keyspace group. splitSourceKg.Keyspaces = splitSourceKeyspaces splitSourceKg.SplitState = &endpoint.SplitState{ @@ -606,6 +607,9 @@ func buildSplitKeyspaces( oldKeyspaceMap[keyspace] = struct{}{} } for _, keyspace := range new { + if keyspace == utils.DefaultKeyspaceID { + return nil, nil, ErrModifyDefaultKeyspace + } if _, ok := oldKeyspaceMap[keyspace]; !ok { return nil, nil, ErrKeyspaceNotInKeyspaceGroup } @@ -618,15 +622,7 @@ func buildSplitKeyspaces( oldSplit = append(oldSplit, keyspace) } } - // Dedup new keyspaces if it's necessary. - if newNum == len(newKeyspaceMap) { - return oldSplit, new, nil - } - newSplit := make([]uint32, 0, len(newKeyspaceMap)) - for keyspace := range newKeyspaceMap { - newSplit = append(newSplit, keyspace) - } - return oldSplit, newSplit, nil + return oldSplit, new, nil } // Split according to the start and end keyspace ID. if startKeyspaceID == 0 && endKeyspaceID == 0 { @@ -637,6 +633,9 @@ func buildSplitKeyspaces( newKeyspaceMap = make(map[uint32]struct{}, newNum) ) for _, keyspace := range old { + if keyspace == utils.DefaultKeyspaceID { + return nil, nil, ErrModifyDefaultKeyspace + } if startKeyspaceID <= keyspace && keyspace <= endKeyspaceID { newSplit = append(newSplit, keyspace) newKeyspaceMap[keyspace] = struct{}{} @@ -770,7 +769,9 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount return nil, err } m.groups[endpoint.StringUserKind(kg.UserKind)].Put(kg) - log.Info("alloc nodes for keyspace group", zap.Uint32("keyspace-group-id", id), zap.Reflect("nodes", nodes)) + log.Info("alloc nodes for keyspace group", + zap.Uint32("keyspace-group-id", id), + zap.Reflect("nodes", nodes)) return nodes, nil } @@ -906,20 +907,17 @@ func (m *GroupManager) MergeKeyspaceGroups(mergeTargetID uint32, mergeList []uin } groups[kgID] = kg } + // Build the new keyspaces for the merge target keyspace group. mergeTargetKg = groups[mergeTargetID] keyspaces := make(map[uint32]struct{}) for _, keyspace := range mergeTargetKg.Keyspaces { keyspaces[keyspace] = struct{}{} } - // Delete the keyspace groups in merge list and move the keyspaces in it to the target keyspace group. for _, kgID := range mergeList { kg := groups[kgID] for _, keyspace := range kg.Keyspaces { keyspaces[keyspace] = struct{}{} } - if err := m.store.DeleteKeyspaceGroup(txn, kg.ID); err != nil { - return err - } } mergedKeyspaces := make([]uint32, 0, len(keyspaces)) for keyspace := range keyspaces { @@ -933,7 +931,17 @@ func (m *GroupManager) MergeKeyspaceGroups(mergeTargetID uint32, mergeList []uin mergeTargetKg.MergeState = &endpoint.MergeState{ MergeList: mergeList, } - return m.store.SaveKeyspaceGroup(txn, mergeTargetKg) + err = m.store.SaveKeyspaceGroup(txn, mergeTargetKg) + if err != nil { + return err + } + // Delete the keyspace groups in merge list and move the keyspaces in it to the target keyspace group. + for _, kgID := range mergeList { + if err := m.store.DeleteKeyspaceGroup(txn, kgID); err != nil { + return err + } + } + return nil }); err != nil { return err } @@ -948,7 +956,10 @@ func (m *GroupManager) MergeKeyspaceGroups(mergeTargetID uint32, mergeList []uin // FinishMergeKeyspaceByID finishes the merging keyspace group by the merge target ID. func (m *GroupManager) FinishMergeKeyspaceByID(mergeTargetID uint32) error { - var mergeTargetKg *endpoint.KeyspaceGroup + var ( + mergeTargetKg *endpoint.KeyspaceGroup + mergeList []uint32 + ) m.Lock() defer m.Unlock() if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { @@ -974,6 +985,7 @@ func (m *GroupManager) FinishMergeKeyspaceByID(mergeTargetID uint32) error { return ErrKeyspaceGroupNotInMerging(kgID) } } + mergeList = mergeTargetKg.MergeState.MergeList mergeTargetKg.MergeState = nil return m.store.SaveKeyspaceGroup(txn, mergeTargetKg) }); err != nil { @@ -981,5 +993,8 @@ func (m *GroupManager) FinishMergeKeyspaceByID(mergeTargetID uint32) error { } // Update the keyspace group cache. m.groups[endpoint.StringUserKind(mergeTargetKg.UserKind)].Put(mergeTargetKg) + log.Info("finish merge keyspace group", + zap.Uint32("merge-target-id", mergeTargetKg.ID), + zap.Reflect("merge-list", mergeList)) return nil } diff --git a/pkg/keyspace/tso_keyspace_group_test.go b/pkg/keyspace/tso_keyspace_group_test.go index 0eb71d2d5c4e..40b779382cd4 100644 --- a/pkg/keyspace/tso_keyspace_group_test.go +++ b/pkg/keyspace/tso_keyspace_group_test.go @@ -238,8 +238,9 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() { keyspaceGroups := []*endpoint.KeyspaceGroup{ { - ID: uint32(1), - UserKind: endpoint.Basic.String(), + ID: uint32(1), + UserKind: endpoint.Basic.String(), + Keyspaces: []uint32{444}, }, { ID: uint32(2), @@ -250,8 +251,11 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() { } err := suite.kgm.CreateKeyspaceGroups(keyspaceGroups) re.NoError(err) + // split the default keyspace + err = suite.kgm.SplitKeyspaceGroupByID(0, 4, []uint32{utils.DefaultKeyspaceID}) + re.ErrorIs(err, ErrModifyDefaultKeyspace) // split the keyspace group 1 to 4 - err = suite.kgm.SplitKeyspaceGroupByID(1, 4, []uint32{333}) + err = suite.kgm.SplitKeyspaceGroupByID(1, 4, []uint32{444}) re.ErrorIs(err, ErrKeyspaceGroupNotEnoughReplicas) // split the keyspace group 2 to 4 without giving any keyspace err = suite.kgm.SplitKeyspaceGroupByID(2, 4, []uint32{}) @@ -316,7 +320,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() { err = suite.kgm.SplitKeyspaceGroupByID(3, 5, nil) re.ErrorContains(err, ErrKeyspaceGroupNotExists(3).Error()) // split into an existing keyspace group - err = suite.kgm.SplitKeyspaceGroupByID(2, 4, nil) + err = suite.kgm.SplitKeyspaceGroupByID(2, 4, []uint32{111}) re.ErrorIs(err, ErrKeyspaceGroupExists) // split with the wrong keyspaces. err = suite.kgm.SplitKeyspaceGroupByID(2, 5, []uint32{111, 222, 444}) diff --git a/pkg/keyspace/util.go b/pkg/keyspace/util.go index ac8168aa5f21..bf4f8413bb17 100644 --- a/pkg/keyspace/util.go +++ b/pkg/keyspace/util.go @@ -80,8 +80,9 @@ var ( ErrNoAvailableNode = errors.New("no available node") // ErrExceedMaxEtcdTxnOps is used to indicate the number of etcd txn operations exceeds the limit. ErrExceedMaxEtcdTxnOps = errors.New("exceed max etcd txn operations") - errModifyDefault = errors.New("cannot modify default keyspace's state") - errIllegalOperation = errors.New("unknown operation") + // ErrModifyDefaultKeyspace is used to indicate that default keyspace cannot be modified. + ErrModifyDefaultKeyspace = errors.New("cannot modify default keyspace's state") + errIllegalOperation = errors.New("unknown operation") // stateTransitionTable lists all allowed next state for the given current state. // Note that transit from any state to itself is allowed for idempotence.