Skip to content

Commit

Permalink
Change keyspace group primary path.
Browse files Browse the repository at this point in the history
Summary of "keyspace group primary path change".

What:
The path for non-default keyspace group primary election changes from  "/ms/{cluster_id}/tso/{group}/primary" to "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary".
Default keyspace group keeps /ms/{cluster_id}/tso/00000/primary.

Why do we need this change?
We need to add watch loop to every TSO server to watch the primaries of all keyspace groups.
When #TSOServers > #GroupReplica, a TSO server only owns replicas of part of keyspace groups, i.e., the TSO server doesn't know the primaries of keyspace groups that the TSO server doesn't own.
The partial keyspace group state view leads to slow TSO service discovery and inconsistent membership view.
The above watch loop needs to watch the range [/ms/{cluster_id}/tso/00000/primary, /ms/{cluster_id}/tso/99999/primary],
but it overlaps with keyspace groups' timestamp path /ms/{cluster_id}/tso/{group}/{gta|lta}/.../timestamp which results in huge inefficiency (as far as I know, there is no built-in filter based on suffix).

Why do we keep the same path for Default keyspace group?
It has been deployed to prod, and the change will cause incompatability issues. Leave the change and upgrade to the future.

Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing committed May 26, 2023
1 parent 8b16b71 commit 8bbb45d
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 78 deletions.
4 changes: 4 additions & 0 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,16 @@ const (
// We also reserved 0 for the keyspace group for the same purpose.
DefaultKeyspaceGroupID = uint32(0)

// MicroserviceKey is the key of microservice.
MicroserviceKey = "ms"
// APIServiceName is the name of api server.
APIServiceName = "api"
// TSOServiceName is the name of tso server.
TSOServiceName = "tso"
// ResourceManagerServiceName is the name of resource manager server.
ResourceManagerServiceName = "resource_manager"
// KeyspaceGroupsKey is the path component of keyspace groups.
KeyspaceGroupsKey = "keyspace_groups"

// MaxKeyspaceGroupCount is the max count of keyspace groups. keyspace group in tso
// is the sharding unit, i.e., by the definition here, the max count of the shards
Expand Down
23 changes: 6 additions & 17 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,10 @@ const (
resourceGroupStatesPath = "states"
controllerConfigPath = "controller"
// tso storage endpoint has prefix `tso`
microserviceKey = "ms"
tsoServiceKey = utils.TSOServiceName
timestampKey = "timestamp"
tsoServiceKey = utils.TSOServiceName
timestampKey = "timestamp"

tsoKeyspaceGroupPrefix = "tso/keyspace_groups"
tsoKeyspaceGroupPrefix = tsoServiceKey + "/" + utils.KeyspaceGroupsKey
keyspaceGroupMembershipKey = "membership"

// we use uint64 to represent ID, the max length of uint64 is 20.
Expand Down Expand Up @@ -238,20 +237,10 @@ func KeyspaceGroupIDPath(id uint32) string {
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey, encodeKeyspaceGroupID(id))
}

// ExtractKeyspaceGroupIDFromPath extracts keyspace group id from the given path, which contains
// the pattern of `tso/keyspace_groups/membership/(\d{5})$`.
func ExtractKeyspaceGroupIDFromPath(path string) (uint32, error) {
// GetCompiledKeyspaceGroupIDRegexp returns the compiled regular expression for matching keyspace group id.
func GetCompiledKeyspaceGroupIDRegexp() *regexp.Regexp {
pattern := strings.Join([]string{KeyspaceGroupIDPrefix(), `(\d{5})$`}, "/")
re := regexp.MustCompile(pattern)
match := re.FindStringSubmatch(path)
if match == nil {
return 0, fmt.Errorf("invalid keyspace group id path: %s", path)
}
id, err := strconv.ParseUint(match[1], 10, 32)
if err != nil {
return 0, fmt.Errorf("failed to parse keyspace group ID: %v", err)
}
return uint32(id), nil
return regexp.MustCompile(pattern)
}

// encodeKeyspaceGroupID from uint32 to string.
Expand Down
48 changes: 0 additions & 48 deletions pkg/storage/endpoint/key_path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,51 +27,3 @@ func BenchmarkRegionPath(b *testing.B) {
_ = RegionPath(uint64(i))
}
}

func TestExtractKeyspaceGroupIDFromPath(t *testing.T) {
re := require.New(t)

rightCases := []struct {
path string
id uint32
}{
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00000", id: 0},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00001", id: 1},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345", id: 12345},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/99999", id: 99999},
{path: "tso/keyspace_groups/membership/00000", id: 0},
{path: "tso/keyspace_groups/membership/00001", id: 1},
{path: "tso/keyspace_groups/membership/12345", id: 12345},
{path: "tso/keyspace_groups/membership/99999", id: 99999},
}

for _, tt := range rightCases {
id, err := ExtractKeyspaceGroupIDFromPath(tt.path)
re.Equal(tt.id, id)
re.NoError(err)
}

wrongCases := []struct {
path string
}{
{path: ""},
{path: "00001"},
{path: "xxx/keyspace_groups/membership/00001"},
{path: "tso/xxxxxxxxxxxxxxx/membership/00001"},
{path: "tso/keyspace_groups/xxxxxxxxxx/00001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/xxxxxxxxxx/00001"},
{path: "/pd/{cluster_id}/xxx/keyspace_groups/membership/00001"},
{path: "/pd/{cluster_id}/tso/xxxxxxxxxxxxxxx/membership/00001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/123456"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/1234a"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345a"},
}

for _, tt := range wrongCases {
_, err := ExtractKeyspaceGroupIDFromPath(tt.path)
re.Error(err)
}
}
58 changes: 49 additions & 9 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net/http"
"path"
"regexp"
"sort"
"strings"
"sync"
Expand All @@ -46,9 +47,10 @@ import (
)

const (
// primaryElectionSuffix is the suffix of the key for keyspace group primary election
primaryElectionSuffix = "primary"
defaultRetryInterval = 500 * time.Millisecond
keyspaceGroupsElectionPath = mcsutils.KeyspaceGroupsKey + "/election"
// leaderNameKey is the key for keyspace group primary election.
leaderNameKey = "primary"
defaultRetryInterval = 500 * time.Millisecond
)

type state struct {
Expand Down Expand Up @@ -147,6 +149,32 @@ func (s *state) getKeyspaceGroupMetaWithCheck(
mcsutils.DefaultKeyspaceGroupID, nil
}

// kgPrimaryPathBuilder builds the path for keyspace group primary election.
// default keyspace group: "/ms/{cluster_id}/tso/00000/primary".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary".
type kgPrimaryPathBuilder struct {
// rootPath is "/ms/{cluster_id}/tso".
rootPath string
// defaultKeyspaceGroupIDPath is "/ms/{cluster_id}/tso/00000".
defaultKeyspaceGroupIDPath string
}

// getKeyspaceGroupIDPath returns the keyspace group primary ID path.
// default keyspace group: "/ms/{cluster_id}/tso/00000".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}".
func (p *kgPrimaryPathBuilder) getKeyspaceGroupIDPath(keyspaceGroupID uint32) string {
if keyspaceGroupID == mcsutils.DefaultKeyspaceGroupID {
return p.defaultKeyspaceGroupIDPath
}
return path.Join(p.rootPath, keyspaceGroupsElectionPath, fmt.Sprintf("%05d", keyspaceGroupID))
}

// getCompiledNonDefaultIDRegexp returns the compiled regular expression for matching non-default keyspace group id.
func (p *kgPrimaryPathBuilder) getCompiledNonDefaultIDRegexp() *regexp.Regexp {
pattern := strings.Join([]string{p.rootPath, keyspaceGroupsElectionPath, `(\d{5})`, leaderNameKey + `$`}, "/")
return regexp.MustCompile(pattern)
}

// KeyspaceGroupManager manages the members of the keyspace groups assigned to this host.
// The replicas campaign for the leaders which provide the tso service for the corresponding
// keyspace groups.
Expand Down Expand Up @@ -183,7 +211,9 @@ type KeyspaceGroupManager struct {
// tsoSvcRootPath defines the root path for all etcd paths used in the tso microservices.
// It is in the format of "/ms/<cluster-id>/tso".
// The main paths for different usages include:
// 1. The path for keyspace group primary election. Format: "/ms/{cluster_id}/tso/{group}/primary"
// 1. The path for keyspace group primary election.
// default keyspace group: "/ms/{cluster_id}/tso/00000/primary".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary".
// 2. The path for LoadTimestamp/SaveTimestamp in the storage endpoint for all the non-default
// keyspace groups.
// Key: /ms/{cluster_id}/tso/{group}/gta/timestamp
Expand All @@ -204,10 +234,14 @@ type KeyspaceGroupManager struct {
loadKeyspaceGroupsBatchSize int64
loadFromEtcdMaxRetryTimes int

// compiledKGMembershipIDRegexp is the compiled regular expression for matching keyspace group id in the
// keyspace group membership path.
compiledKGMembershipIDRegexp *regexp.Regexp
// groupUpdateRetryList is the list of keyspace groups which failed to update and need to retry.
groupUpdateRetryList map[uint32]*endpoint.KeyspaceGroup
groupWatcher *etcdutil.LoopWatcher

groupWatcher *etcdutil.LoopWatcher
primaryPathBuilder *kgPrimaryPathBuilder
}

// NewKeyspaceGroupManager creates a new Keyspace Group Manager.
Expand Down Expand Up @@ -244,6 +278,11 @@ func NewKeyspaceGroupManager(
kv.NewEtcdKVBase(kgm.etcdClient, kgm.legacySvcRootPath), nil)
kgm.tsoSvcStorage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(kgm.etcdClient, kgm.tsoSvcRootPath), nil)
kgm.compiledKGMembershipIDRegexp = endpoint.GetCompiledKeyspaceGroupIDRegexp()
kgm.primaryPathBuilder = &kgPrimaryPathBuilder{
rootPath: kgm.tsoSvcRootPath,
defaultKeyspaceGroupIDPath: path.Join(kgm.tsoSvcRootPath, "00000"),
}
kgm.state.initialize()
return kgm
}
Expand All @@ -268,7 +307,7 @@ func (kgm *KeyspaceGroupManager) Initialize() error {
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
groupID, err := endpoint.ExtractKeyspaceGroupIDFromPath(string(kv.Key))
groupID, err := ExtractKeyspaceGroupIDFromPath(kgm.compiledKGMembershipIDRegexp, string(kv.Key))
if err != nil {
return err
}
Expand Down Expand Up @@ -303,14 +342,15 @@ func (kgm *KeyspaceGroupManager) Initialize() error {
if kgm.loadKeyspaceGroupsBatchSize > 0 {
kgm.groupWatcher.SetLoadBatchSize(kgm.loadKeyspaceGroupsBatchSize)
}

kgm.wg.Add(1)
go kgm.groupWatcher.StartWatchLoop()

if err := kgm.groupWatcher.WaitLoad(); err != nil {
log.Error("failed to initialize keyspace group manager", errs.ZapError(err))
// We might have partially loaded/initialized the keyspace groups. Close the manager to clean up.
kgm.Close()
return errs.ErrLoadKeyspaceGroupsTerminated
return errs.ErrLoadKeyspaceGroupsTerminated.Wrap(err)
}

if !defaultKGConfigured {
Expand Down Expand Up @@ -388,8 +428,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
// Initialize the participant info to join the primary election.
participant := member.NewParticipant(kgm.etcdClient)
participant.InitInfo(
uniqueName, uniqueID, path.Join(kgm.tsoSvcRootPath, fmt.Sprintf("%05d", group.ID)),
primaryElectionSuffix, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr())
uniqueName, uniqueID, kgm.primaryPathBuilder.getKeyspaceGroupIDPath(group.ID),
leaderNameKey, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr())
// If the keyspace group is in split, we should ensure that the primary elected by the new keyspace group
// is always on the same TSO Server node as the primary of the old keyspace group, and this constraint cannot
// be broken until the entire split process is completed.
Expand Down
7 changes: 3 additions & 4 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsTimeout() {
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/delayLoad", "return(3)"))
err := mgr.Initialize()
// If loading keyspace groups timeout, the initialization should fail with ErrLoadKeyspaceGroupsTerminated.
re.Equal(errs.ErrLoadKeyspaceGroupsTerminated, err)
re.Contains(err.Error(), errs.ErrLoadKeyspaceGroupsTerminated.Error())
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/delayLoad"))
}

Expand Down Expand Up @@ -592,9 +592,8 @@ func (suite *keyspaceGroupManagerTestSuite) newKeyspaceGroupManager(
electionNamePrefix, legacySvcRootPath, tsoSvcRootPath string,
) *KeyspaceGroupManager {
return NewKeyspaceGroupManager(
suite.ctx, tsoServiceID, suite.etcdClient, nil,
electionNamePrefix, legacySvcRootPath, tsoSvcRootPath,
suite.cfg)
suite.ctx, tsoServiceID, suite.etcdClient, nil, electionNamePrefix,
legacySvcRootPath, tsoSvcRootPath, suite.cfg)
}

// runTestLoadMultipleKeyspaceGroupsAssignment tests the loading of multiple keyspace group assignment.
Expand Down
35 changes: 35 additions & 0 deletions pkg/tso/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tso

import (
"fmt"
"regexp"
"strconv"
)

// ExtractKeyspaceGroupIDFromPath extracts keyspace group id from the given path, which contains
// the pattern of `tso/keyspace_groups/membership/(\d{5})$`.
func ExtractKeyspaceGroupIDFromPath(compiledRegexp *regexp.Regexp, path string) (uint32, error) {
match := compiledRegexp.FindStringSubmatch(path)
if match == nil {
return 0, fmt.Errorf("invalid keyspace group id path: %s", path)
}
id, err := strconv.ParseUint(match[1], 10, 32)
if err != nil {
return 0, fmt.Errorf("failed to parse keyspace group ID: %v", err)
}
return uint32(id), nil
}
100 changes: 100 additions & 0 deletions pkg/tso/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tso

import (
"path"
"testing"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/storage/endpoint"
)

func TestExtractKeyspaceGroupIDFromKeyspaceGroupMembershipPath(t *testing.T) {
re := require.New(t)

compiledRegexp := endpoint.GetCompiledKeyspaceGroupIDRegexp()

rightCases := []struct {
path string
id uint32
}{
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00000", id: 0},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00001", id: 1},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345", id: 12345},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/99999", id: 99999},
{path: "tso/keyspace_groups/membership/00000", id: 0},
{path: "tso/keyspace_groups/membership/00001", id: 1},
{path: "tso/keyspace_groups/membership/12345", id: 12345},
{path: "tso/keyspace_groups/membership/99999", id: 99999},
}

for _, tt := range rightCases {
id, err := ExtractKeyspaceGroupIDFromPath(compiledRegexp, tt.path)
re.Equal(tt.id, id)
re.NoError(err)
}

wrongCases := []struct {
path string
}{
{path: ""},
{path: "00001"},
{path: "xxx/keyspace_groups/membership/00001"},
{path: "tso/xxxxxxxxxxxxxxx/membership/00001"},
{path: "tso/keyspace_groups/xxxxxxxxxx/00001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/xxxxxxxxxx/00001"},
{path: "/pd/{cluster_id}/xxx/keyspace_groups/membership/00001"},
{path: "/pd/{cluster_id}/tso/xxxxxxxxxxxxxxx/membership/00001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/123456"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/1234a"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345a"},
}

for _, tt := range wrongCases {
_, err := ExtractKeyspaceGroupIDFromPath(compiledRegexp, tt.path)
re.Error(err)
}
}

func TestExtractKeyspaceGroupIDFromKeyspaceGroupPrimaryPath(t *testing.T) {
re := require.New(t)

tsoSvcRootPath := "/ms/111/tso"
primaryPathBuilder := &kgPrimaryPathBuilder{
rootPath: tsoSvcRootPath,
defaultKeyspaceGroupIDPath: path.Join(tsoSvcRootPath, "00000"),
}

compiledRegexp := primaryPathBuilder.getCompiledNonDefaultIDRegexp()

rightCases := []struct {
path string
id uint32
}{
{path: "/ms/111/tso/keyspace_groups/election/00001/primary", id: 1},
{path: "/ms/111/tso/keyspace_groups/election/12345/primary", id: 12345},
{path: "/ms/111/tso/keyspace_groups/election/99999/primary", id: 99999},
}

for _, tt := range rightCases {
id, err := ExtractKeyspaceGroupIDFromPath(compiledRegexp, tt.path)
re.Equal(tt.id, id)
re.NoError(err)
}
}

0 comments on commit 8bbb45d

Please sign in to comment.