Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs, tso: change keyspace group primary path. #6526

Merged
merged 4 commits into from
May 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,16 @@ const (
// We also reserved 0 for the keyspace group for the same purpose.
DefaultKeyspaceGroupID = uint32(0)

// MicroserviceKey is the key of microservice.
MicroserviceKey = "ms"
// APIServiceName is the name of api server.
APIServiceName = "api"
// TSOServiceName is the name of tso server.
TSOServiceName = "tso"
// ResourceManagerServiceName is the name of resource manager server.
ResourceManagerServiceName = "resource_manager"
// KeyspaceGroupsKey is the path component of keyspace groups.
KeyspaceGroupsKey = "keyspace_groups"

// MaxKeyspaceGroupCount is the max count of keyspace groups. keyspace group in tso
// is the sharding unit, i.e., by the definition here, the max count of the shards
Expand Down
23 changes: 6 additions & 17 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,10 @@ const (
resourceGroupStatesPath = "states"
controllerConfigPath = "controller"
// tso storage endpoint has prefix `tso`
microserviceKey = "ms"
tsoServiceKey = utils.TSOServiceName
timestampKey = "timestamp"
tsoServiceKey = utils.TSOServiceName
timestampKey = "timestamp"

tsoKeyspaceGroupPrefix = "tso/keyspace_groups"
tsoKeyspaceGroupPrefix = tsoServiceKey + "/" + utils.KeyspaceGroupsKey
keyspaceGroupMembershipKey = "membership"

// we use uint64 to represent ID, the max length of uint64 is 20.
Expand Down Expand Up @@ -238,20 +237,10 @@ func KeyspaceGroupIDPath(id uint32) string {
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey, encodeKeyspaceGroupID(id))
}

// ExtractKeyspaceGroupIDFromPath extracts keyspace group id from the given path, which contains
// the pattern of `tso/keyspace_groups/membership/(\d{5})$`.
func ExtractKeyspaceGroupIDFromPath(path string) (uint32, error) {
// GetCompiledKeyspaceGroupIDRegexp returns the compiled regular expression for matching keyspace group id.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to separate it into two functions?

Copy link
Contributor Author

@binshi-bing binshi-bing May 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

performance concern. Compile the patten once and use it every time for matching to boost performance. It should save time for loading all the existing keyspace groups before starting watch loop, thus speed up server bootstrap.

func GetCompiledKeyspaceGroupIDRegexp() *regexp.Regexp {
pattern := strings.Join([]string{KeyspaceGroupIDPrefix(), `(\d{5})$`}, "/")
re := regexp.MustCompile(pattern)
match := re.FindStringSubmatch(path)
if match == nil {
return 0, fmt.Errorf("invalid keyspace group id path: %s", path)
}
id, err := strconv.ParseUint(match[1], 10, 32)
if err != nil {
return 0, fmt.Errorf("failed to parse keyspace group ID: %v", err)
}
return uint32(id), nil
return regexp.MustCompile(pattern)
}

// encodeKeyspaceGroupID from uint32 to string.
Expand Down
48 changes: 0 additions & 48 deletions pkg/storage/endpoint/key_path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,51 +27,3 @@ func BenchmarkRegionPath(b *testing.B) {
_ = RegionPath(uint64(i))
}
}

func TestExtractKeyspaceGroupIDFromPath(t *testing.T) {
re := require.New(t)

rightCases := []struct {
path string
id uint32
}{
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00000", id: 0},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00001", id: 1},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345", id: 12345},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/99999", id: 99999},
{path: "tso/keyspace_groups/membership/00000", id: 0},
{path: "tso/keyspace_groups/membership/00001", id: 1},
{path: "tso/keyspace_groups/membership/12345", id: 12345},
{path: "tso/keyspace_groups/membership/99999", id: 99999},
}

for _, tt := range rightCases {
id, err := ExtractKeyspaceGroupIDFromPath(tt.path)
re.Equal(tt.id, id)
re.NoError(err)
}

wrongCases := []struct {
path string
}{
{path: ""},
{path: "00001"},
{path: "xxx/keyspace_groups/membership/00001"},
{path: "tso/xxxxxxxxxxxxxxx/membership/00001"},
{path: "tso/keyspace_groups/xxxxxxxxxx/00001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/xxxxxxxxxx/00001"},
{path: "/pd/{cluster_id}/xxx/keyspace_groups/membership/00001"},
{path: "/pd/{cluster_id}/tso/xxxxxxxxxxxxxxx/membership/00001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/123456"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/1234a"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345a"},
}

for _, tt := range wrongCases {
_, err := ExtractKeyspaceGroupIDFromPath(tt.path)
re.Error(err)
}
}
57 changes: 48 additions & 9 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net/http"
"path"
"regexp"
"sort"
"strings"
"sync"
Expand All @@ -46,9 +47,9 @@ import (
)

const (
// primaryElectionSuffix is the suffix of the key for keyspace group primary election
primaryElectionSuffix = "primary"
defaultRetryInterval = 500 * time.Millisecond
keyspaceGroupsElectionPath = mcsutils.KeyspaceGroupsKey + "/election"
// primaryKey is the key for keyspace group primary election.
primaryKey = "primary"
)

type state struct {
Expand Down Expand Up @@ -147,6 +148,32 @@ func (s *state) getKeyspaceGroupMetaWithCheck(
mcsutils.DefaultKeyspaceGroupID, nil
}

// kgPrimaryPathBuilder builds the path for keyspace group primary election.
// default keyspace group: "/ms/{cluster_id}/tso/00000/primary".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary".
type kgPrimaryPathBuilder struct {
// rootPath is "/ms/{cluster_id}/tso".
rootPath string
// defaultKeyspaceGroupIDPath is "/ms/{cluster_id}/tso/00000".
defaultKeyspaceGroupIDPath string
}

// getKeyspaceGroupIDPath returns the keyspace group primary ID path.
// default keyspace group: "/ms/{cluster_id}/tso/00000".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}".
func (p *kgPrimaryPathBuilder) getKeyspaceGroupIDPath(keyspaceGroupID uint32) string {
if keyspaceGroupID == mcsutils.DefaultKeyspaceGroupID {
return p.defaultKeyspaceGroupIDPath
}
return path.Join(p.rootPath, keyspaceGroupsElectionPath, fmt.Sprintf("%05d", keyspaceGroupID))
}

// getCompiledNonDefaultIDRegexp returns the compiled regular expression for matching non-default keyspace group id.
func (p *kgPrimaryPathBuilder) getCompiledNonDefaultIDRegexp() *regexp.Regexp {
pattern := strings.Join([]string{p.rootPath, keyspaceGroupsElectionPath, `(\d{5})`, primaryKey + `$`}, "/")
return regexp.MustCompile(pattern)
}

// KeyspaceGroupManager manages the members of the keyspace groups assigned to this host.
// The replicas campaign for the leaders which provide the tso service for the corresponding
// keyspace groups.
Expand Down Expand Up @@ -183,7 +210,9 @@ type KeyspaceGroupManager struct {
// tsoSvcRootPath defines the root path for all etcd paths used in the tso microservices.
// It is in the format of "/ms/<cluster-id>/tso".
// The main paths for different usages include:
// 1. The path for keyspace group primary election. Format: "/ms/{cluster_id}/tso/{group}/primary"
// 1. The path for keyspace group primary election.
// default keyspace group: "/ms/{cluster_id}/tso/00000/primary".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary".
// 2. The path for LoadTimestamp/SaveTimestamp in the storage endpoint for all the non-default
// keyspace groups.
// Key: /ms/{cluster_id}/tso/{group}/gta/timestamp
Expand All @@ -204,10 +233,14 @@ type KeyspaceGroupManager struct {
loadKeyspaceGroupsBatchSize int64
loadFromEtcdMaxRetryTimes int

// compiledKGMembershipIDRegexp is the compiled regular expression for matching keyspace group id in the
// keyspace group membership path.
compiledKGMembershipIDRegexp *regexp.Regexp
// groupUpdateRetryList is the list of keyspace groups which failed to update and need to retry.
groupUpdateRetryList map[uint32]*endpoint.KeyspaceGroup
groupWatcher *etcdutil.LoopWatcher

groupWatcher *etcdutil.LoopWatcher
primaryPathBuilder *kgPrimaryPathBuilder
}

// NewKeyspaceGroupManager creates a new Keyspace Group Manager.
Expand Down Expand Up @@ -244,6 +277,11 @@ func NewKeyspaceGroupManager(
kv.NewEtcdKVBase(kgm.etcdClient, kgm.legacySvcRootPath), nil)
kgm.tsoSvcStorage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(kgm.etcdClient, kgm.tsoSvcRootPath), nil)
kgm.compiledKGMembershipIDRegexp = endpoint.GetCompiledKeyspaceGroupIDRegexp()
kgm.primaryPathBuilder = &kgPrimaryPathBuilder{
rootPath: kgm.tsoSvcRootPath,
defaultKeyspaceGroupIDPath: path.Join(kgm.tsoSvcRootPath, "00000"),
}
kgm.state.initialize()
return kgm
}
Expand All @@ -268,7 +306,7 @@ func (kgm *KeyspaceGroupManager) Initialize() error {
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
groupID, err := endpoint.ExtractKeyspaceGroupIDFromPath(string(kv.Key))
groupID, err := ExtractKeyspaceGroupIDFromPath(kgm.compiledKGMembershipIDRegexp, string(kv.Key))
if err != nil {
return err
}
Expand Down Expand Up @@ -303,14 +341,15 @@ func (kgm *KeyspaceGroupManager) Initialize() error {
if kgm.loadKeyspaceGroupsBatchSize > 0 {
kgm.groupWatcher.SetLoadBatchSize(kgm.loadKeyspaceGroupsBatchSize)
}

kgm.wg.Add(1)
go kgm.groupWatcher.StartWatchLoop()

if err := kgm.groupWatcher.WaitLoad(); err != nil {
log.Error("failed to initialize keyspace group manager", errs.ZapError(err))
// We might have partially loaded/initialized the keyspace groups. Close the manager to clean up.
kgm.Close()
return errs.ErrLoadKeyspaceGroupsTerminated
return errs.ErrLoadKeyspaceGroupsTerminated.Wrap(err)
}

if !defaultKGConfigured {
Expand Down Expand Up @@ -388,8 +427,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
// Initialize the participant info to join the primary election.
participant := member.NewParticipant(kgm.etcdClient)
participant.InitInfo(
uniqueName, uniqueID, path.Join(kgm.tsoSvcRootPath, fmt.Sprintf("%05d", group.ID)),
primaryElectionSuffix, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr())
uniqueName, uniqueID, kgm.primaryPathBuilder.getKeyspaceGroupIDPath(group.ID),
primaryKey, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr())
// If the keyspace group is in split, we should ensure that the primary elected by the new keyspace group
// is always on the same TSO Server node as the primary of the old keyspace group, and this constraint cannot
// be broken until the entire split process is completed.
Expand Down
7 changes: 3 additions & 4 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsTimeout() {
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/delayLoad", "return(3)"))
err := mgr.Initialize()
// If loading keyspace groups timeout, the initialization should fail with ErrLoadKeyspaceGroupsTerminated.
re.Equal(errs.ErrLoadKeyspaceGroupsTerminated, err)
re.Contains(err.Error(), errs.ErrLoadKeyspaceGroupsTerminated.Error())
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/delayLoad"))
}

Expand Down Expand Up @@ -592,9 +592,8 @@ func (suite *keyspaceGroupManagerTestSuite) newKeyspaceGroupManager(
electionNamePrefix, legacySvcRootPath, tsoSvcRootPath string,
) *KeyspaceGroupManager {
return NewKeyspaceGroupManager(
suite.ctx, tsoServiceID, suite.etcdClient, nil,
electionNamePrefix, legacySvcRootPath, tsoSvcRootPath,
suite.cfg)
suite.ctx, tsoServiceID, suite.etcdClient, nil, electionNamePrefix,
legacySvcRootPath, tsoSvcRootPath, suite.cfg)
}

// runTestLoadMultipleKeyspaceGroupsAssignment tests the loading of multiple keyspace group assignment.
Expand Down
35 changes: 35 additions & 0 deletions pkg/tso/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tso

import (
"fmt"
"regexp"
"strconv"
)

// ExtractKeyspaceGroupIDFromPath extracts keyspace group id from the given path, which contains
// the pattern of `tso/keyspace_groups/membership/(\d{5})$`.
func ExtractKeyspaceGroupIDFromPath(compiledRegexp *regexp.Regexp, path string) (uint32, error) {
match := compiledRegexp.FindStringSubmatch(path)
if match == nil {
return 0, fmt.Errorf("invalid keyspace group id path: %s", path)
}
id, err := strconv.ParseUint(match[1], 10, 32)
if err != nil {
return 0, fmt.Errorf("failed to parse keyspace group ID: %v", err)
}
return uint32(id), nil
}
100 changes: 100 additions & 0 deletions pkg/tso/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tso

import (
"path"
"testing"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/storage/endpoint"
)

func TestExtractKeyspaceGroupIDFromKeyspaceGroupMembershipPath(t *testing.T) {
re := require.New(t)

compiledRegexp := endpoint.GetCompiledKeyspaceGroupIDRegexp()

rightCases := []struct {
path string
id uint32
}{
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00000", id: 0},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00001", id: 1},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345", id: 12345},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/99999", id: 99999},
{path: "tso/keyspace_groups/membership/00000", id: 0},
{path: "tso/keyspace_groups/membership/00001", id: 1},
{path: "tso/keyspace_groups/membership/12345", id: 12345},
{path: "tso/keyspace_groups/membership/99999", id: 99999},
}

for _, tt := range rightCases {
id, err := ExtractKeyspaceGroupIDFromPath(compiledRegexp, tt.path)
re.Equal(tt.id, id)
re.NoError(err)
}

wrongCases := []struct {
path string
}{
{path: ""},
{path: "00001"},
{path: "xxx/keyspace_groups/membership/00001"},
{path: "tso/xxxxxxxxxxxxxxx/membership/00001"},
{path: "tso/keyspace_groups/xxxxxxxxxx/00001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/xxxxxxxxxx/00001"},
{path: "/pd/{cluster_id}/xxx/keyspace_groups/membership/00001"},
{path: "/pd/{cluster_id}/tso/xxxxxxxxxxxxxxx/membership/00001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/123456"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/1234a"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345a"},
}

for _, tt := range wrongCases {
_, err := ExtractKeyspaceGroupIDFromPath(compiledRegexp, tt.path)
re.Error(err)
}
}

func TestExtractKeyspaceGroupIDFromKeyspaceGroupPrimaryPath(t *testing.T) {
re := require.New(t)

tsoSvcRootPath := "/ms/111/tso"
primaryPathBuilder := &kgPrimaryPathBuilder{
rootPath: tsoSvcRootPath,
defaultKeyspaceGroupIDPath: path.Join(tsoSvcRootPath, "00000"),
}

compiledRegexp := primaryPathBuilder.getCompiledNonDefaultIDRegexp()

rightCases := []struct {
path string
id uint32
}{
{path: "/ms/111/tso/keyspace_groups/election/00001/primary", id: 1},
{path: "/ms/111/tso/keyspace_groups/election/12345/primary", id: 12345},
{path: "/ms/111/tso/keyspace_groups/election/99999/primary", id: 99999},
}

for _, tt := range rightCases {
id, err := ExtractKeyspaceGroupIDFromPath(compiledRegexp, tt.path)
re.Equal(tt.id, id)
re.NoError(err)
}
}
Loading