Skip to content

Commit

Permalink
mcs, tso: change keyspace group primary path. (tikv#6526)
Browse files Browse the repository at this point in the history
ref tikv#5895

mcs, tso: change keyspace group primary path.

The path for non-default keyspace group primary election changes
from  "/ms/{cluster_id}/tso/{group}/primary" to "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary".
Default keyspace group keeps /ms/{cluster_id}/tso/00000/primary.

Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing authored and rleungx committed Aug 2, 2023
1 parent ff7d54d commit 13e3e93
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 81 deletions.
4 changes: 4 additions & 0 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,16 @@ const (
// We also reserved 0 for the keyspace group for the same purpose.
DefaultKeyspaceGroupID = uint32(0)

// MicroserviceKey is the key of microservice.
MicroserviceKey = "ms"
// APIServiceName is the name of api server.
APIServiceName = "api"
// TSOServiceName is the name of tso server.
TSOServiceName = "tso"
// ResourceManagerServiceName is the name of resource manager server.
ResourceManagerServiceName = "resource_manager"
// KeyspaceGroupsKey is the path component of keyspace groups.
KeyspaceGroupsKey = "keyspace_groups"

// MaxKeyspaceGroupCount is the max count of keyspace groups. keyspace group in tso
// is the sharding unit, i.e., by the definition here, the max count of the shards
Expand Down
23 changes: 6 additions & 17 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,10 @@ const (
resourceGroupStatesPath = "states"
controllerConfigPath = "controller"
// tso storage endpoint has prefix `tso`
microserviceKey = "ms"
tsoServiceKey = utils.TSOServiceName
timestampKey = "timestamp"
tsoServiceKey = utils.TSOServiceName
timestampKey = "timestamp"

tsoKeyspaceGroupPrefix = "tso/keyspace_groups"
tsoKeyspaceGroupPrefix = tsoServiceKey + "/" + utils.KeyspaceGroupsKey
keyspaceGroupMembershipKey = "membership"

// we use uint64 to represent ID, the max length of uint64 is 20.
Expand Down Expand Up @@ -240,20 +239,10 @@ func KeyspaceGroupIDPath(id uint32) string {
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey, encodeKeyspaceGroupID(id))
}

// ExtractKeyspaceGroupIDFromPath extracts keyspace group id from the given path, which contains
// the pattern of `tso/keyspace_groups/membership/(\d{5})$`.
func ExtractKeyspaceGroupIDFromPath(path string) (uint32, error) {
// GetCompiledKeyspaceGroupIDRegexp returns the compiled regular expression for matching keyspace group id.
func GetCompiledKeyspaceGroupIDRegexp() *regexp.Regexp {
pattern := strings.Join([]string{KeyspaceGroupIDPrefix(), `(\d{5})$`}, "/")
re := regexp.MustCompile(pattern)
match := re.FindStringSubmatch(path)
if match == nil {
return 0, fmt.Errorf("invalid keyspace group id path: %s", path)
}
id, err := strconv.ParseUint(match[1], 10, 32)
if err != nil {
return 0, fmt.Errorf("failed to parse keyspace group ID: %v", err)
}
return uint32(id), nil
return regexp.MustCompile(pattern)
}

// encodeKeyspaceGroupID from uint32 to string.
Expand Down
48 changes: 0 additions & 48 deletions pkg/storage/endpoint/key_path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,51 +27,3 @@ func BenchmarkRegionPath(b *testing.B) {
_ = RegionPath(uint64(i))
}
}

func TestExtractKeyspaceGroupIDFromPath(t *testing.T) {
re := require.New(t)

rightCases := []struct {
path string
id uint32
}{
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00000", id: 0},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00001", id: 1},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345", id: 12345},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/99999", id: 99999},
{path: "tso/keyspace_groups/membership/00000", id: 0},
{path: "tso/keyspace_groups/membership/00001", id: 1},
{path: "tso/keyspace_groups/membership/12345", id: 12345},
{path: "tso/keyspace_groups/membership/99999", id: 99999},
}

for _, tt := range rightCases {
id, err := ExtractKeyspaceGroupIDFromPath(tt.path)
re.Equal(tt.id, id)
re.NoError(err)
}

wrongCases := []struct {
path string
}{
{path: ""},
{path: "00001"},
{path: "xxx/keyspace_groups/membership/00001"},
{path: "tso/xxxxxxxxxxxxxxx/membership/00001"},
{path: "tso/keyspace_groups/xxxxxxxxxx/00001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/xxxxxxxxxx/00001"},
{path: "/pd/{cluster_id}/xxx/keyspace_groups/membership/00001"},
{path: "/pd/{cluster_id}/tso/xxxxxxxxxxxxxxx/membership/00001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/123456"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/1234a"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345a"},
}

for _, tt := range wrongCases {
_, err := ExtractKeyspaceGroupIDFromPath(tt.path)
re.Error(err)
}
}
57 changes: 48 additions & 9 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net/http"
"path"
"regexp"
"sort"
"strings"
"sync"
Expand All @@ -46,9 +47,9 @@ import (
)

const (
// primaryElectionSuffix is the suffix of the key for keyspace group primary election
primaryElectionSuffix = "primary"
defaultRetryInterval = 500 * time.Millisecond
keyspaceGroupsElectionPath = mcsutils.KeyspaceGroupsKey + "/election"
// primaryKey is the key for keyspace group primary election.
primaryKey = "primary"
)

type state struct {
Expand Down Expand Up @@ -147,6 +148,32 @@ func (s *state) getKeyspaceGroupMetaWithCheck(
mcsutils.DefaultKeyspaceGroupID, nil
}

// kgPrimaryPathBuilder builds the path for keyspace group primary election.
// default keyspace group: "/ms/{cluster_id}/tso/00000/primary".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary".
type kgPrimaryPathBuilder struct {
// rootPath is "/ms/{cluster_id}/tso".
rootPath string
// defaultKeyspaceGroupIDPath is "/ms/{cluster_id}/tso/00000".
defaultKeyspaceGroupIDPath string
}

// getKeyspaceGroupIDPath returns the keyspace group primary ID path.
// default keyspace group: "/ms/{cluster_id}/tso/00000".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}".
func (p *kgPrimaryPathBuilder) getKeyspaceGroupIDPath(keyspaceGroupID uint32) string {
if keyspaceGroupID == mcsutils.DefaultKeyspaceGroupID {
return p.defaultKeyspaceGroupIDPath
}
return path.Join(p.rootPath, keyspaceGroupsElectionPath, fmt.Sprintf("%05d", keyspaceGroupID))
}

// getCompiledNonDefaultIDRegexp returns the compiled regular expression for matching non-default keyspace group id.
func (p *kgPrimaryPathBuilder) getCompiledNonDefaultIDRegexp() *regexp.Regexp {
pattern := strings.Join([]string{p.rootPath, keyspaceGroupsElectionPath, `(\d{5})`, primaryKey + `$`}, "/")
return regexp.MustCompile(pattern)
}

// KeyspaceGroupManager manages the members of the keyspace groups assigned to this host.
// The replicas campaign for the leaders which provide the tso service for the corresponding
// keyspace groups.
Expand Down Expand Up @@ -183,7 +210,9 @@ type KeyspaceGroupManager struct {
// tsoSvcRootPath defines the root path for all etcd paths used in the tso microservices.
// It is in the format of "/ms/<cluster-id>/tso".
// The main paths for different usages include:
// 1. The path for keyspace group primary election. Format: "/ms/{cluster_id}/tso/{group}/primary"
// 1. The path for keyspace group primary election.
// default keyspace group: "/ms/{cluster_id}/tso/00000/primary".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary".
// 2. The path for LoadTimestamp/SaveTimestamp in the storage endpoint for all the non-default
// keyspace groups.
// Key: /ms/{cluster_id}/tso/{group}/gta/timestamp
Expand All @@ -204,10 +233,14 @@ type KeyspaceGroupManager struct {
loadKeyspaceGroupsBatchSize int64
loadFromEtcdMaxRetryTimes int

// compiledKGMembershipIDRegexp is the compiled regular expression for matching keyspace group id in the
// keyspace group membership path.
compiledKGMembershipIDRegexp *regexp.Regexp
// groupUpdateRetryList is the list of keyspace groups which failed to update and need to retry.
groupUpdateRetryList map[uint32]*endpoint.KeyspaceGroup
groupWatcher *etcdutil.LoopWatcher

groupWatcher *etcdutil.LoopWatcher
primaryPathBuilder *kgPrimaryPathBuilder
}

// NewKeyspaceGroupManager creates a new Keyspace Group Manager.
Expand Down Expand Up @@ -244,6 +277,11 @@ func NewKeyspaceGroupManager(
kv.NewEtcdKVBase(kgm.etcdClient, kgm.legacySvcRootPath), nil)
kgm.tsoSvcStorage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(kgm.etcdClient, kgm.tsoSvcRootPath), nil)
kgm.compiledKGMembershipIDRegexp = endpoint.GetCompiledKeyspaceGroupIDRegexp()
kgm.primaryPathBuilder = &kgPrimaryPathBuilder{
rootPath: kgm.tsoSvcRootPath,
defaultKeyspaceGroupIDPath: path.Join(kgm.tsoSvcRootPath, "00000"),
}
kgm.state.initialize()
return kgm
}
Expand All @@ -268,7 +306,7 @@ func (kgm *KeyspaceGroupManager) Initialize() error {
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
groupID, err := endpoint.ExtractKeyspaceGroupIDFromPath(string(kv.Key))
groupID, err := ExtractKeyspaceGroupIDFromPath(kgm.compiledKGMembershipIDRegexp, string(kv.Key))
if err != nil {
return err
}
Expand Down Expand Up @@ -303,14 +341,15 @@ func (kgm *KeyspaceGroupManager) Initialize() error {
if kgm.loadKeyspaceGroupsBatchSize > 0 {
kgm.groupWatcher.SetLoadBatchSize(kgm.loadKeyspaceGroupsBatchSize)
}

kgm.wg.Add(1)
go kgm.groupWatcher.StartWatchLoop()

if err := kgm.groupWatcher.WaitLoad(); err != nil {
log.Error("failed to initialize keyspace group manager", errs.ZapError(err))
// We might have partially loaded/initialized the keyspace groups. Close the manager to clean up.
kgm.Close()
return errs.ErrLoadKeyspaceGroupsTerminated
return errs.ErrLoadKeyspaceGroupsTerminated.Wrap(err)
}

if !defaultKGConfigured {
Expand Down Expand Up @@ -388,8 +427,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
// Initialize the participant info to join the primary election.
participant := member.NewParticipant(kgm.etcdClient)
participant.InitInfo(
uniqueName, uniqueID, path.Join(kgm.tsoSvcRootPath, fmt.Sprintf("%05d", group.ID)),
primaryElectionSuffix, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr())
uniqueName, uniqueID, kgm.primaryPathBuilder.getKeyspaceGroupIDPath(group.ID),
primaryKey, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr())
// If the keyspace group is in split, we should ensure that the primary elected by the new keyspace group
// is always on the same TSO Server node as the primary of the old keyspace group, and this constraint cannot
// be broken until the entire split process is completed.
Expand Down
7 changes: 3 additions & 4 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsTimeout() {
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/delayLoad", "return(3)"))
err := mgr.Initialize()
// If loading keyspace groups timeout, the initialization should fail with ErrLoadKeyspaceGroupsTerminated.
re.Equal(errs.ErrLoadKeyspaceGroupsTerminated, err)
re.Contains(err.Error(), errs.ErrLoadKeyspaceGroupsTerminated.Error())
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/delayLoad"))
}

Expand Down Expand Up @@ -592,9 +592,8 @@ func (suite *keyspaceGroupManagerTestSuite) newKeyspaceGroupManager(
electionNamePrefix, legacySvcRootPath, tsoSvcRootPath string,
) *KeyspaceGroupManager {
return NewKeyspaceGroupManager(
suite.ctx, tsoServiceID, suite.etcdClient, nil,
electionNamePrefix, legacySvcRootPath, tsoSvcRootPath,
suite.cfg)
suite.ctx, tsoServiceID, suite.etcdClient, nil, electionNamePrefix,
legacySvcRootPath, tsoSvcRootPath, suite.cfg)
}

// runTestLoadMultipleKeyspaceGroupsAssignment tests the loading of multiple keyspace group assignment.
Expand Down
35 changes: 35 additions & 0 deletions pkg/tso/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tso

import (
"fmt"
"regexp"
"strconv"
)

// ExtractKeyspaceGroupIDFromPath extracts keyspace group id from the given path, which contains
// the pattern of `tso/keyspace_groups/membership/(\d{5})$`.
func ExtractKeyspaceGroupIDFromPath(compiledRegexp *regexp.Regexp, path string) (uint32, error) {
match := compiledRegexp.FindStringSubmatch(path)
if match == nil {
return 0, fmt.Errorf("invalid keyspace group id path: %s", path)
}
id, err := strconv.ParseUint(match[1], 10, 32)
if err != nil {
return 0, fmt.Errorf("failed to parse keyspace group ID: %v", err)
}
return uint32(id), nil
}
100 changes: 100 additions & 0 deletions pkg/tso/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tso

import (
"path"
"testing"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/storage/endpoint"
)

func TestExtractKeyspaceGroupIDFromKeyspaceGroupMembershipPath(t *testing.T) {
re := require.New(t)

compiledRegexp := endpoint.GetCompiledKeyspaceGroupIDRegexp()

rightCases := []struct {
path string
id uint32
}{
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00000", id: 0},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00001", id: 1},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345", id: 12345},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/99999", id: 99999},
{path: "tso/keyspace_groups/membership/00000", id: 0},
{path: "tso/keyspace_groups/membership/00001", id: 1},
{path: "tso/keyspace_groups/membership/12345", id: 12345},
{path: "tso/keyspace_groups/membership/99999", id: 99999},
}

for _, tt := range rightCases {
id, err := ExtractKeyspaceGroupIDFromPath(compiledRegexp, tt.path)
re.Equal(tt.id, id)
re.NoError(err)
}

wrongCases := []struct {
path string
}{
{path: ""},
{path: "00001"},
{path: "xxx/keyspace_groups/membership/00001"},
{path: "tso/xxxxxxxxxxxxxxx/membership/00001"},
{path: "tso/keyspace_groups/xxxxxxxxxx/00001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/xxxxxxxxxx/00001"},
{path: "/pd/{cluster_id}/xxx/keyspace_groups/membership/00001"},
{path: "/pd/{cluster_id}/tso/xxxxxxxxxxxxxxx/membership/00001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/123456"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/1234a"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345a"},
}

for _, tt := range wrongCases {
_, err := ExtractKeyspaceGroupIDFromPath(compiledRegexp, tt.path)
re.Error(err)
}
}

func TestExtractKeyspaceGroupIDFromKeyspaceGroupPrimaryPath(t *testing.T) {
re := require.New(t)

tsoSvcRootPath := "/ms/111/tso"
primaryPathBuilder := &kgPrimaryPathBuilder{
rootPath: tsoSvcRootPath,
defaultKeyspaceGroupIDPath: path.Join(tsoSvcRootPath, "00000"),
}

compiledRegexp := primaryPathBuilder.getCompiledNonDefaultIDRegexp()

rightCases := []struct {
path string
id uint32
}{
{path: "/ms/111/tso/keyspace_groups/election/00001/primary", id: 1},
{path: "/ms/111/tso/keyspace_groups/election/12345/primary", id: 12345},
{path: "/ms/111/tso/keyspace_groups/election/99999/primary", id: 99999},
}

for _, tt := range rightCases {
id, err := ExtractKeyspaceGroupIDFromPath(compiledRegexp, tt.path)
re.Equal(tt.id, id)
re.NoError(err)
}
}
Loading

0 comments on commit 13e3e93

Please sign in to comment.