diff --git a/pkg/mcs/utils/constant.go b/pkg/mcs/utils/constant.go index 9c9eea77732..b73ddd7ed8f 100644 --- a/pkg/mcs/utils/constant.go +++ b/pkg/mcs/utils/constant.go @@ -49,12 +49,16 @@ const ( // We also reserved 0 for the keyspace group for the same purpose. DefaultKeyspaceGroupID = uint32(0) + // MicroserviceKey is the key of microservice. + MicroserviceKey = "ms" // APIServiceName is the name of api server. APIServiceName = "api" // TSOServiceName is the name of tso server. TSOServiceName = "tso" // ResourceManagerServiceName is the name of resource manager server. ResourceManagerServiceName = "resource_manager" + // KeyspaceGroupsKey is the path component of keyspace groups. + KeyspaceGroupsKey = "keyspace_groups" // MaxKeyspaceGroupCount is the max count of keyspace groups. keyspace group in tso // is the sharding unit, i.e., by the definition here, the max count of the shards diff --git a/pkg/storage/endpoint/key_path.go b/pkg/storage/endpoint/key_path.go index 9ae8a285085..2eca0e6bf17 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/storage/endpoint/key_path.go @@ -53,11 +53,10 @@ const ( resourceGroupStatesPath = "states" controllerConfigPath = "controller" // tso storage endpoint has prefix `tso` - microserviceKey = "ms" - tsoServiceKey = utils.TSOServiceName - timestampKey = "timestamp" + tsoServiceKey = utils.TSOServiceName + timestampKey = "timestamp" - tsoKeyspaceGroupPrefix = "tso/keyspace_groups" + tsoKeyspaceGroupPrefix = tsoServiceKey + "/" + utils.KeyspaceGroupsKey keyspaceGroupMembershipKey = "membership" // we use uint64 to represent ID, the max length of uint64 is 20. @@ -238,20 +237,10 @@ func KeyspaceGroupIDPath(id uint32) string { return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey, encodeKeyspaceGroupID(id)) } -// ExtractKeyspaceGroupIDFromPath extracts keyspace group id from the given path, which contains -// the pattern of `tso/keyspace_groups/membership/(\d{5})$`. -func ExtractKeyspaceGroupIDFromPath(path string) (uint32, error) { +// GetCompiledKeyspaceGroupIDRegexp returns the compiled regular expression for matching keyspace group id. +func GetCompiledKeyspaceGroupIDRegexp() *regexp.Regexp { pattern := strings.Join([]string{KeyspaceGroupIDPrefix(), `(\d{5})$`}, "/") - re := regexp.MustCompile(pattern) - match := re.FindStringSubmatch(path) - if match == nil { - return 0, fmt.Errorf("invalid keyspace group id path: %s", path) - } - id, err := strconv.ParseUint(match[1], 10, 32) - if err != nil { - return 0, fmt.Errorf("failed to parse keyspace group ID: %v", err) - } - return uint32(id), nil + return regexp.MustCompile(pattern) } // encodeKeyspaceGroupID from uint32 to string. diff --git a/pkg/storage/endpoint/key_path_test.go b/pkg/storage/endpoint/key_path_test.go index 270d1e266fe..d6ef584105a 100644 --- a/pkg/storage/endpoint/key_path_test.go +++ b/pkg/storage/endpoint/key_path_test.go @@ -27,51 +27,3 @@ func BenchmarkRegionPath(b *testing.B) { _ = RegionPath(uint64(i)) } } - -func TestExtractKeyspaceGroupIDFromPath(t *testing.T) { - re := require.New(t) - - rightCases := []struct { - path string - id uint32 - }{ - {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00000", id: 0}, - {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00001", id: 1}, - {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345", id: 12345}, - {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/99999", id: 99999}, - {path: "tso/keyspace_groups/membership/00000", id: 0}, - {path: "tso/keyspace_groups/membership/00001", id: 1}, - {path: "tso/keyspace_groups/membership/12345", id: 12345}, - {path: "tso/keyspace_groups/membership/99999", id: 99999}, - } - - for _, tt := range rightCases { - id, err := ExtractKeyspaceGroupIDFromPath(tt.path) - re.Equal(tt.id, id) - re.NoError(err) - } - - wrongCases := []struct { - path string - }{ - {path: ""}, - {path: "00001"}, - {path: "xxx/keyspace_groups/membership/00001"}, - {path: "tso/xxxxxxxxxxxxxxx/membership/00001"}, - {path: "tso/keyspace_groups/xxxxxxxxxx/00001"}, - {path: "/pd/{cluster_id}/tso/keyspace_groups/xxxxxxxxxx/00001"}, - {path: "/pd/{cluster_id}/xxx/keyspace_groups/membership/00001"}, - {path: "/pd/{cluster_id}/tso/xxxxxxxxxxxxxxx/membership/00001"}, - {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/"}, - {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0"}, - {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0001"}, - {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/123456"}, - {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/1234a"}, - {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345a"}, - } - - for _, tt := range wrongCases { - _, err := ExtractKeyspaceGroupIDFromPath(tt.path) - re.Error(err) - } -} diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 6859ae82386..9eaea2bf48e 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -20,6 +20,7 @@ import ( "fmt" "net/http" "path" + "regexp" "sort" "strings" "sync" @@ -46,9 +47,9 @@ import ( ) const ( - // primaryElectionSuffix is the suffix of the key for keyspace group primary election - primaryElectionSuffix = "primary" - defaultRetryInterval = 500 * time.Millisecond + keyspaceGroupsElectionPath = mcsutils.KeyspaceGroupsKey + "/election" + // primaryKey is the key for keyspace group primary election. + primaryKey = "primary" ) type state struct { @@ -147,6 +148,32 @@ func (s *state) getKeyspaceGroupMetaWithCheck( mcsutils.DefaultKeyspaceGroupID, nil } +// kgPrimaryPathBuilder builds the path for keyspace group primary election. +// default keyspace group: "/ms/{cluster_id}/tso/00000/primary". +// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary". +type kgPrimaryPathBuilder struct { + // rootPath is "/ms/{cluster_id}/tso". + rootPath string + // defaultKeyspaceGroupIDPath is "/ms/{cluster_id}/tso/00000". + defaultKeyspaceGroupIDPath string +} + +// getKeyspaceGroupIDPath returns the keyspace group primary ID path. +// default keyspace group: "/ms/{cluster_id}/tso/00000". +// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}". +func (p *kgPrimaryPathBuilder) getKeyspaceGroupIDPath(keyspaceGroupID uint32) string { + if keyspaceGroupID == mcsutils.DefaultKeyspaceGroupID { + return p.defaultKeyspaceGroupIDPath + } + return path.Join(p.rootPath, keyspaceGroupsElectionPath, fmt.Sprintf("%05d", keyspaceGroupID)) +} + +// getCompiledNonDefaultIDRegexp returns the compiled regular expression for matching non-default keyspace group id. +func (p *kgPrimaryPathBuilder) getCompiledNonDefaultIDRegexp() *regexp.Regexp { + pattern := strings.Join([]string{p.rootPath, keyspaceGroupsElectionPath, `(\d{5})`, primaryKey + `$`}, "/") + return regexp.MustCompile(pattern) +} + // KeyspaceGroupManager manages the members of the keyspace groups assigned to this host. // The replicas campaign for the leaders which provide the tso service for the corresponding // keyspace groups. @@ -183,7 +210,9 @@ type KeyspaceGroupManager struct { // tsoSvcRootPath defines the root path for all etcd paths used in the tso microservices. // It is in the format of "/ms//tso". // The main paths for different usages include: - // 1. The path for keyspace group primary election. Format: "/ms/{cluster_id}/tso/{group}/primary" + // 1. The path for keyspace group primary election. + // default keyspace group: "/ms/{cluster_id}/tso/00000/primary". + // non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary". // 2. The path for LoadTimestamp/SaveTimestamp in the storage endpoint for all the non-default // keyspace groups. // Key: /ms/{cluster_id}/tso/{group}/gta/timestamp @@ -204,10 +233,14 @@ type KeyspaceGroupManager struct { loadKeyspaceGroupsBatchSize int64 loadFromEtcdMaxRetryTimes int + // compiledKGMembershipIDRegexp is the compiled regular expression for matching keyspace group id in the + // keyspace group membership path. + compiledKGMembershipIDRegexp *regexp.Regexp // groupUpdateRetryList is the list of keyspace groups which failed to update and need to retry. groupUpdateRetryList map[uint32]*endpoint.KeyspaceGroup + groupWatcher *etcdutil.LoopWatcher - groupWatcher *etcdutil.LoopWatcher + primaryPathBuilder *kgPrimaryPathBuilder } // NewKeyspaceGroupManager creates a new Keyspace Group Manager. @@ -244,6 +277,11 @@ func NewKeyspaceGroupManager( kv.NewEtcdKVBase(kgm.etcdClient, kgm.legacySvcRootPath), nil) kgm.tsoSvcStorage = endpoint.NewStorageEndpoint( kv.NewEtcdKVBase(kgm.etcdClient, kgm.tsoSvcRootPath), nil) + kgm.compiledKGMembershipIDRegexp = endpoint.GetCompiledKeyspaceGroupIDRegexp() + kgm.primaryPathBuilder = &kgPrimaryPathBuilder{ + rootPath: kgm.tsoSvcRootPath, + defaultKeyspaceGroupIDPath: path.Join(kgm.tsoSvcRootPath, "00000"), + } kgm.state.initialize() return kgm } @@ -268,7 +306,7 @@ func (kgm *KeyspaceGroupManager) Initialize() error { return nil } deleteFn := func(kv *mvccpb.KeyValue) error { - groupID, err := endpoint.ExtractKeyspaceGroupIDFromPath(string(kv.Key)) + groupID, err := ExtractKeyspaceGroupIDFromPath(kgm.compiledKGMembershipIDRegexp, string(kv.Key)) if err != nil { return err } @@ -303,6 +341,7 @@ func (kgm *KeyspaceGroupManager) Initialize() error { if kgm.loadKeyspaceGroupsBatchSize > 0 { kgm.groupWatcher.SetLoadBatchSize(kgm.loadKeyspaceGroupsBatchSize) } + kgm.wg.Add(1) go kgm.groupWatcher.StartWatchLoop() @@ -310,7 +349,7 @@ func (kgm *KeyspaceGroupManager) Initialize() error { log.Error("failed to initialize keyspace group manager", errs.ZapError(err)) // We might have partially loaded/initialized the keyspace groups. Close the manager to clean up. kgm.Close() - return errs.ErrLoadKeyspaceGroupsTerminated + return errs.ErrLoadKeyspaceGroupsTerminated.Wrap(err) } if !defaultKGConfigured { @@ -388,8 +427,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro // Initialize the participant info to join the primary election. participant := member.NewParticipant(kgm.etcdClient) participant.InitInfo( - uniqueName, uniqueID, path.Join(kgm.tsoSvcRootPath, fmt.Sprintf("%05d", group.ID)), - primaryElectionSuffix, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr()) + uniqueName, uniqueID, kgm.primaryPathBuilder.getKeyspaceGroupIDPath(group.ID), + primaryKey, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr()) // If the keyspace group is in split, we should ensure that the primary elected by the new keyspace group // is always on the same TSO Server node as the primary of the old keyspace group, and this constraint cannot // be broken until the entire split process is completed. diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 671fc8b73aa..18a09a82bd6 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -183,7 +183,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsTimeout() { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/delayLoad", "return(3)")) err := mgr.Initialize() // If loading keyspace groups timeout, the initialization should fail with ErrLoadKeyspaceGroupsTerminated. - re.Equal(errs.ErrLoadKeyspaceGroupsTerminated, err) + re.Contains(err.Error(), errs.ErrLoadKeyspaceGroupsTerminated.Error()) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/delayLoad")) } @@ -592,9 +592,8 @@ func (suite *keyspaceGroupManagerTestSuite) newKeyspaceGroupManager( electionNamePrefix, legacySvcRootPath, tsoSvcRootPath string, ) *KeyspaceGroupManager { return NewKeyspaceGroupManager( - suite.ctx, tsoServiceID, suite.etcdClient, nil, - electionNamePrefix, legacySvcRootPath, tsoSvcRootPath, - suite.cfg) + suite.ctx, tsoServiceID, suite.etcdClient, nil, electionNamePrefix, + legacySvcRootPath, tsoSvcRootPath, suite.cfg) } // runTestLoadMultipleKeyspaceGroupsAssignment tests the loading of multiple keyspace group assignment. diff --git a/pkg/tso/util.go b/pkg/tso/util.go new file mode 100644 index 00000000000..d7672373a32 --- /dev/null +++ b/pkg/tso/util.go @@ -0,0 +1,35 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tso + +import ( + "fmt" + "regexp" + "strconv" +) + +// ExtractKeyspaceGroupIDFromPath extracts keyspace group id from the given path, which contains +// the pattern of `tso/keyspace_groups/membership/(\d{5})$`. +func ExtractKeyspaceGroupIDFromPath(compiledRegexp *regexp.Regexp, path string) (uint32, error) { + match := compiledRegexp.FindStringSubmatch(path) + if match == nil { + return 0, fmt.Errorf("invalid keyspace group id path: %s", path) + } + id, err := strconv.ParseUint(match[1], 10, 32) + if err != nil { + return 0, fmt.Errorf("failed to parse keyspace group ID: %v", err) + } + return uint32(id), nil +} diff --git a/pkg/tso/util_test.go b/pkg/tso/util_test.go new file mode 100644 index 00000000000..8b7c7a4ce0c --- /dev/null +++ b/pkg/tso/util_test.go @@ -0,0 +1,100 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tso + +import ( + "path" + "testing" + + "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/storage/endpoint" +) + +func TestExtractKeyspaceGroupIDFromKeyspaceGroupMembershipPath(t *testing.T) { + re := require.New(t) + + compiledRegexp := endpoint.GetCompiledKeyspaceGroupIDRegexp() + + rightCases := []struct { + path string + id uint32 + }{ + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00000", id: 0}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00001", id: 1}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345", id: 12345}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/99999", id: 99999}, + {path: "tso/keyspace_groups/membership/00000", id: 0}, + {path: "tso/keyspace_groups/membership/00001", id: 1}, + {path: "tso/keyspace_groups/membership/12345", id: 12345}, + {path: "tso/keyspace_groups/membership/99999", id: 99999}, + } + + for _, tt := range rightCases { + id, err := ExtractKeyspaceGroupIDFromPath(compiledRegexp, tt.path) + re.Equal(tt.id, id) + re.NoError(err) + } + + wrongCases := []struct { + path string + }{ + {path: ""}, + {path: "00001"}, + {path: "xxx/keyspace_groups/membership/00001"}, + {path: "tso/xxxxxxxxxxxxxxx/membership/00001"}, + {path: "tso/keyspace_groups/xxxxxxxxxx/00001"}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/xxxxxxxxxx/00001"}, + {path: "/pd/{cluster_id}/xxx/keyspace_groups/membership/00001"}, + {path: "/pd/{cluster_id}/tso/xxxxxxxxxxxxxxx/membership/00001"}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/"}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0"}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0001"}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/123456"}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/1234a"}, + {path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345a"}, + } + + for _, tt := range wrongCases { + _, err := ExtractKeyspaceGroupIDFromPath(compiledRegexp, tt.path) + re.Error(err) + } +} + +func TestExtractKeyspaceGroupIDFromKeyspaceGroupPrimaryPath(t *testing.T) { + re := require.New(t) + + tsoSvcRootPath := "/ms/111/tso" + primaryPathBuilder := &kgPrimaryPathBuilder{ + rootPath: tsoSvcRootPath, + defaultKeyspaceGroupIDPath: path.Join(tsoSvcRootPath, "00000"), + } + + compiledRegexp := primaryPathBuilder.getCompiledNonDefaultIDRegexp() + + rightCases := []struct { + path string + id uint32 + }{ + {path: "/ms/111/tso/keyspace_groups/election/00001/primary", id: 1}, + {path: "/ms/111/tso/keyspace_groups/election/12345/primary", id: 12345}, + {path: "/ms/111/tso/keyspace_groups/election/99999/primary", id: 99999}, + } + + for _, tt := range rightCases { + id, err := ExtractKeyspaceGroupIDFromPath(compiledRegexp, tt.path) + re.Equal(tt.id, id) + re.NoError(err) + } +} diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index aca79b7a5aa..8849f7a4ab5 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -194,16 +194,24 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe re.NotNil(am) // Make sure every keyspace group is using the right timestamp path - // for loading/saving timestamp from/to etcd. - var timestampPath string + // for loading/saving timestamp from/to etcd and the right primary path + // for primary election. + var ( + timestampPath string + primaryPath string + ) clusterID := strconv.FormatUint(suite.pdLeaderServer.GetClusterID(), 10) if param.keyspaceGroupID == mcsutils.DefaultKeyspaceGroupID { timestampPath = fmt.Sprintf("/pd/%s/timestamp", clusterID) + primaryPath = fmt.Sprintf("/ms/%s/tso/00000/primary", clusterID) } else { timestampPath = fmt.Sprintf("/ms/%s/tso/%05d/gta/timestamp", clusterID, param.keyspaceGroupID) + primaryPath = fmt.Sprintf("/ms/%s/tso/%s/election/%05d/primary", + clusterID, mcsutils.KeyspaceGroupsKey, param.keyspaceGroupID) } - re.Equal(timestampPath, am.GetTimestampPath("")) + re.Equal(timestampPath, am.GetTimestampPath(tsopkg.GlobalDCLocation)) + re.Equal(primaryPath, am.GetMember().GetLeaderPath()) served = true }