Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs, tso: support multi-keyspace-group and its service discovery in E2E path #6321

Merged
merged 27 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
6d0c757
Support multi-keyspace-group in PD(TSO) client
binshi-bing Apr 14, 2023
fe77454
Complete server side implementation
binshi-bing Apr 17, 2023
8a256cd
Complete client side implementation
binshi-bing Apr 18, 2023
5810320
Improve the server/client side logic
binshi-bing Apr 19, 2023
ea1b2ad
Fixed test failures
binshi-bing Apr 19, 2023
56acd92
Fix "invalid argument to Intn"
binshi-bing Apr 19, 2023
550061d
fix test failure in TestAPIServerForwardTestSuite/TestForwardTSOWhenP…
binshi-bing Apr 19, 2023
5897164
Handle feedback
binshi-bing Apr 20, 2023
78b57b4
Improve implementation and fix test failures.
binshi-bing Apr 21, 2023
06f7ff1
remove debugging logs
binshi-bing Apr 21, 2023
e84fcc3
fix TestKeyspaceGroupTestSuite/TestReplica failure
binshi-bing Apr 21, 2023
9984c7c
Fix panic
binshi-bing Apr 21, 2023
7fcc436
Add pd leader bootstrap in test so that pd client can discover tso no…
binshi-bing Apr 21, 2023
d4b1bf3
fix lock
binshi-bing Apr 21, 2023
eb01686
Handle feedback
binshi-bing Apr 21, 2023
f7f16dd
refine checkServiceModeChanged
binshi-bing Apr 21, 2023
d27ead1
Fix ErrClientGetLeader log output and enable verbose test for ci test…
binshi-bing Apr 23, 2023
d025314
Always start tso server watch loop in the group manager instead of du…
binshi-bing Apr 24, 2023
8d475b7
handle feedback
binshi-bing Apr 25, 2023
cb6bfe1
fix TestMicroserviceTSOClient/TestRandomShutdown failure
binshi-bing Apr 25, 2023
ff59724
handle more feedback
binshi-bing Apr 25, 2023
9dd37d5
Handle feedback.
binshi-bing Apr 25, 2023
4c342bc
Trigger tests
binshi-bing Apr 25, 2023
5825876
fix nil am when checking IsKeyspaceServing.
binshi-bing Apr 25, 2023
917118f
remove -v from ci-test-job in pd/tests/integrations/tso/Makefile
binshi-bing Apr 26, 2023
936843d
Upgrade to use the latest kvproto
binshi-bing Apr 26, 2023
531c5ec
Merge branch 'master' into client-multi-kg
ti-chi-bot[bot] Apr 26, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ import (
const (
// defaultKeyspaceID is the default key space id.
// Valid keyspace id range is [0, 0xFFFFFF](uint24max, or 16777215)
// ​0 is reserved for default keyspace with the name "DEFAULT", It's initialized when PD bootstrap and reserved for users who haven't been assigned keyspace.
// ​0 is reserved for default keyspace with the name "DEFAULT", It's initialized when PD bootstrap
// and reserved for users who haven't been assigned keyspace.
defaultKeyspaceID = uint32(0)
// defaultKeySpaceGroupID is the default key space group id.
// We also reserved 0 for the keyspace group for the same purpose.
defaultKeySpaceGroupID = uint32(0)
)

// Region contains information of a region's meta and its peers.
Expand Down Expand Up @@ -205,6 +209,8 @@ var (
errClosing = errors.New("[pd] closing")
// errTSOLength is returned when the number of response timestamps is inconsistent with request.
errTSOLength = errors.New("[pd] tso length in rpc response is incorrect")
// errInvalidRespHeader is returned when the response doesn't contain service mode info unexpectedly.
errNoServiceModeReturned = errors.New("[pd] no service mode returned")
)

// ClientOption configures client.
Expand Down Expand Up @@ -380,6 +386,7 @@ func (c *client) Close() {
func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
c.Lock()
defer c.Unlock()

if newMode == c.serviceMode {
return
}
Expand All @@ -396,13 +403,18 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
newTSOCli = newTSOClient(c.ctx, c.option, c.keyspaceID,
c.pdSvcDiscovery, &pdTSOStreamBuilderFactory{})
case pdpb.ServiceMode_API_SVC_MODE:
newTSOSvcDiscovery = newTSOServiceDiscovery(c.ctx, MetaStorageClient(c),
c.GetClusterID(c.ctx), c.keyspaceID, c.svrUrls, c.tlsCfg, c.option)
newTSOSvcDiscovery = newTSOServiceDiscovery(
c.ctx, MetaStorageClient(c), c.pdSvcDiscovery,
c.GetClusterID(c.ctx), c.keyspaceID, c.tlsCfg, c.option)
// At this point, the keyspace group isn't known yet. Starts from the default keyspace group,
// and will be updated later.
newTSOCli = newTSOClient(c.ctx, c.option, c.keyspaceID,
newTSOSvcDiscovery, &tsoTSOStreamBuilderFactory{})
if err := newTSOSvcDiscovery.Init(); err != nil {
log.Error("[pd] failed to initialize tso service discovery. keep the current service mode",
zap.Strings("svr-urls", c.svrUrls), zap.String("current-mode", c.serviceMode.String()), zap.Error(err))
zap.Strings("svr-urls", c.svrUrls),
zap.String("current-mode", c.serviceMode.String()),
zap.Error(err))
return
}
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
Expand Down Expand Up @@ -602,11 +614,10 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur
req.clientCtx = c.ctx
tsoClient := c.getTSOClient()
req.start = time.Now()
req.keyspaceID = c.keyspaceID
req.dcLocation = dcLocation

if tsoClient == nil {
req.done <- errs.ErrClientGetTSO
req.done <- errs.ErrClientGetTSO.FastGenByArgs("tso client is nil")
return req
}

Expand Down
6 changes: 3 additions & 3 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ func TestUpdateURLs(t *testing.T) {
cli := &pdServiceDiscovery{option: newOption()}
cli.urls.Store([]string{})
cli.updateURLs(members[1:])
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetURLs())
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs())
cli.updateURLs(members[1:])
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetURLs())
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs())
cli.updateURLs(members)
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2], members[0]}), cli.GetURLs())
re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2], members[0]}), cli.GetServiceURLs())
}

const testClientURL = "tmp://test.url:5255"
Expand Down
25 changes: 13 additions & 12 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,19 @@ const (

// client errors
var (
ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient"))
ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream"))
ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed"))
ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout"))
ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
ErrClientGetLeader = errors.Normalize("get leader from %v error", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo"))
ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember"))
ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal"))
ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse"))
ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint"))
ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient"))
ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream"))
ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed"))
ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout"))
ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
ErrClientGetLeader = errors.Normalize("get leader failed, %v", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo"))
ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember"))
ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal"))
ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse"))
ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint"))
ErrClientFindGroupByKeyspaceID = errors.Normalize("can't find keyspace group by keyspace id", errors.RFCCodeText("PD:client:ErrClientFindGroupByKeyspaceID"))
)

// grpcutil errors
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230407040905-68d0eebd564a
github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.1
github.com/stretchr/testify v1.8.2
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230407040905-68d0eebd564a h1:PWkMSJSDaOuLNKCV84K3tQ9stZuZPN8E148jRPD9TcA=
github.com/pingcap/kvproto v0.0.0-20230407040905-68d0eebd564a/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be h1:eHtwHgPzzm8aIZ4x8o7zg1b23cjUl0AikW+SDLpqf3E=
github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
111 changes: 85 additions & 26 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ const (
updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation.
)

type serviceType int

const (
apiService serviceType = iota
tsoService
)

// ServiceDiscovery defines the general interface for service discovery on a quorum-based cluster
// or a primary/secondary configured cluster.
type ServiceDiscovery interface {
Expand All @@ -50,8 +57,14 @@ type ServiceDiscovery interface {
Close()
// GetClusterID returns the ID of the cluster
GetClusterID() uint64
// GetURLs returns the URLs of the servers.
GetURLs() []string
// GetKeyspaceID returns the ID of the keyspace
GetKeyspaceID() uint32
// GetKeyspaceGroupID returns the ID of the keyspace group
GetKeyspaceGroupID() uint32
// DiscoverServiceURLs discovers the microservice with the specified type and returns the server urls.
DiscoverMicroservice(svcType serviceType) []string
// GetServiceURLs returns the URLs of the servers providing the service
GetServiceURLs() []string
// GetServingEndpointClientConn returns the grpc client connection of the serving endpoint
// which is the leader in a quorum-based cluster or the primary in a primary/secondary
// configured cluster.
Expand Down Expand Up @@ -174,7 +187,9 @@ func (c *pdServiceDiscovery) Init() error {
}
log.Info("[pd] init cluster id", zap.Uint64("cluster-id", c.clusterID))

c.updateServiceMode()
if err := c.checkServiceModeChanged(); err != nil {
log.Warn("[pd] failed to check service mode and will check later", zap.Error(err))
}

c.wg.Add(2)
go c.updateMemberLoop()
Expand Down Expand Up @@ -218,7 +233,7 @@ func (c *pdServiceDiscovery) updateMemberLoop() {
failpoint.Continue()
})
if err := c.updateMember(); err != nil {
log.Error("[pd] failed to update member", zap.Strings("urls", c.GetURLs()), errs.ZapError(err))
log.Error("[pd] failed to update member", zap.Strings("urls", c.GetServiceURLs()), errs.ZapError(err))
}
}
}
Expand All @@ -240,7 +255,11 @@ func (c *pdServiceDiscovery) updateServiceModeLoop() {
return
case <-ticker.C:
}
c.updateServiceMode()
if err := c.checkServiceModeChanged(); err != nil {
log.Error("[pd] failed to update service mode",
zap.Strings("urls", c.GetServiceURLs()), errs.ZapError(err))
c.ScheduleCheckMemberChanged() // check if the leader changed
}
}
}

Expand All @@ -263,13 +282,50 @@ func (c *pdServiceDiscovery) GetClusterID() uint64 {
return c.clusterID
}

// GetURLs returns the URLs of the servers.
// GetKeyspaceID returns the ID of the keyspace
func (c *pdServiceDiscovery) GetKeyspaceID() uint32 {
// PD/API service only supports the default keyspace
return defaultKeyspaceID
}

// GetKeyspaceGroupID returns the ID of the keyspace group
func (c *pdServiceDiscovery) GetKeyspaceGroupID() uint32 {
// PD/API service only supports the default keyspace group
return defaultKeySpaceGroupID
}

// DiscoverServiceURLs discovers the microservice with the specified type and returns the server urls.
func (c *pdServiceDiscovery) DiscoverMicroservice(svcType serviceType) (urls []string) {
switch svcType {
case apiService:
urls = c.GetServiceURLs()
case tsoService:
leaderAddr := c.getLeaderAddr()
if len(leaderAddr) > 0 {
clusterInfo, err := c.getClusterInfo(c.ctx, leaderAddr, c.option.timeout)
if err != nil {
log.Error("[pd] failed to get cluster info",
zap.String("leader-addr", leaderAddr), errs.ZapError(err))
return nil
}
urls = clusterInfo.TsoUrls
} else {
log.Error("[pd] failed to get leader addr")
}
default:
panic("invalid service type")
}

return urls
}

// GetServiceURLs returns the URLs of the servers.
// For testing use. It should only be called when the client is closed.
func (c *pdServiceDiscovery) GetURLs() []string {
func (c *pdServiceDiscovery) GetServiceURLs() []string {
return c.urls.Load().([]string)
}

// GetServingAddr returns the grpc client connection of the serving endpoint
// GetServingEndpointClientConn returns the grpc client connection of the serving endpoint
// which is the leader in a quorum-based cluster or the primary in a primary/secondary
// configured cluster.
func (c *pdServiceDiscovery) GetServingEndpointClientConn() *grpc.ClientConn {
Expand Down Expand Up @@ -360,7 +416,7 @@ func (c *pdServiceDiscovery) initClusterID() error {
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
clusterID := uint64(0)
for _, url := range c.GetURLs() {
for _, url := range c.GetServiceURLs() {
members, err := c.getMembers(ctx, url, c.option.timeout)
if err != nil || members.GetHeader() == nil {
log.Warn("[pd] failed to get cluster id", zap.String("url", url), errs.ZapError(err))
Expand All @@ -386,29 +442,32 @@ func (c *pdServiceDiscovery) initClusterID() error {
return nil
}

func (c *pdServiceDiscovery) updateServiceMode() {
func (c *pdServiceDiscovery) checkServiceModeChanged() error {
leaderAddr := c.getLeaderAddr()
if len(leaderAddr) > 0 {
clusterInfo, err := c.getClusterInfo(c.ctx, leaderAddr, c.option.timeout)
// If the method is not supported, we set it to pd mode.
if err != nil {
if len(leaderAddr) == 0 {
return errors.New("no leader found")
}

clusterInfo, err := c.getClusterInfo(c.ctx, leaderAddr, c.option.timeout)
if err != nil {
if strings.Contains(err.Error(), "Unimplemented") {
// If the method is not supported, we set it to pd mode.
// TODO: it's a hack way to solve the compatibility issue.
// we need to remove this after all maintained version supports the method.
if strings.Contains(err.Error(), "Unimplemented") {
c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE)
} else {
log.Warn("[pd] failed to get cluster info for the leader", zap.String("leader-addr", leaderAddr), errs.ZapError(err))
}
return
c.serviceModeUpdateCb(pdpb.ServiceMode_PD_SVC_MODE)
return nil
}
c.serviceModeUpdateCb(clusterInfo.ServiceModes[0])
} else {
log.Warn("[pd] no leader found")
return err
}
if clusterInfo == nil || len(clusterInfo.ServiceModes) == 0 {
return errors.WithStack(errNoServiceModeReturned)
}
c.serviceModeUpdateCb(clusterInfo.ServiceModes[0])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to validate the ServiceModes filed to make sure it has at least one element?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if clusterInfo == nil || len(clusterInfo.ServiceModes) == 0 {
	return errors.WithStack(errNoServiceModeReturned)
}
c.serviceModeUpdateCb(clusterInfo.ServiceModes[0])

return nil
}

func (c *pdServiceDiscovery) updateMember() error {
for i, url := range c.GetURLs() {
for i, url := range c.GetServiceURLs() {
failpoint.Inject("skipFirstUpdateMember", func() {
if i == 0 {
failpoint.Continue()
Expand All @@ -424,7 +483,7 @@ func (c *pdServiceDiscovery) updateMember() error {
var errTSO error
if err == nil {
if members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0 {
err = errs.ErrClientGetLeader.FastGenByArgs("leader address don't exist")
err = errs.ErrClientGetLeader.FastGenByArgs("leader address doesn't exist")
}
// Still need to update TsoAllocatorLeaders, even if there is no PD leader
errTSO = c.switchTSOAllocatorLeaders(members.GetTsoAllocatorLeaders())
Expand Down Expand Up @@ -501,7 +560,7 @@ func (c *pdServiceDiscovery) updateURLs(members []*pdpb.Member) {
}

sort.Strings(urls)
oldURLs := c.GetURLs()
oldURLs := c.GetServiceURLs()
// the url list is same.
if reflect.DeepEqual(oldURLs, urls) {
return
Expand Down
1 change: 0 additions & 1 deletion client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type tsoRequest struct {
done chan error
physical int64
logical int64
keyspaceID uint32
dcLocation string
}

Expand Down
Loading