Skip to content

Commit

Permalink
Handle compatibility issue in GetClusterInfo RPC (tikv#6434)
Browse files Browse the repository at this point in the history
ref tikv#5895, close tikv#6448

Handle the compatibility issue in the GetClusterInfo RPC

Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing authored and rleungx committed Aug 2, 2023
1 parent 6d79e83 commit 9d47a9f
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 33 deletions.
11 changes: 6 additions & 5 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type ServiceDiscovery interface {
// GetKeyspaceGroupID returns the ID of the keyspace group
GetKeyspaceGroupID() uint32
// DiscoverServiceURLs discovers the microservice with the specified type and returns the server urls.
DiscoverMicroservice(svcType serviceType) []string
DiscoverMicroservice(svcType serviceType) ([]string, error)
// GetServiceURLs returns the URLs of the servers providing the service
GetServiceURLs() []string
// GetServingEndpointClientConn returns the grpc client connection of the serving endpoint
Expand Down Expand Up @@ -299,7 +299,7 @@ func (c *pdServiceDiscovery) GetKeyspaceGroupID() uint32 {
}

// DiscoverServiceURLs discovers the microservice with the specified type and returns the server urls.
func (c *pdServiceDiscovery) DiscoverMicroservice(svcType serviceType) (urls []string) {
func (c *pdServiceDiscovery) DiscoverMicroservice(svcType serviceType) (urls []string, err error) {
switch svcType {
case apiService:
urls = c.GetServiceURLs()
Expand All @@ -310,17 +310,18 @@ func (c *pdServiceDiscovery) DiscoverMicroservice(svcType serviceType) (urls []s
if err != nil {
log.Error("[pd] failed to get cluster info",
zap.String("leader-addr", leaderAddr), errs.ZapError(err))
return nil
return nil, err
}
urls = clusterInfo.TsoUrls
} else {
log.Error("[pd] failed to get leader addr")
err = errors.New("failed to get leader addr")
return nil, err
}
default:
panic("invalid service type")
}

return urls
return urls, nil
}

// GetServiceURLs returns the URLs of the servers.
Expand Down
114 changes: 86 additions & 28 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"sync"
"time"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
Expand Down Expand Up @@ -96,31 +98,6 @@ type tsoServerDiscovery struct {
failureCount int
}

func (t *tsoServerDiscovery) getTSOServer(sd ServiceDiscovery) (string, error) {
t.Lock()
defer t.Unlock()

if len(t.addrs) == 0 || t.failureCount == len(t.addrs) {
addrs := sd.DiscoverMicroservice(tsoService)
if len(addrs) == 0 {
return "", errors.New("no tso server address found")
}

log.Info("update tso server addresses", zap.Strings("addrs", addrs))

t.addrs = addrs
t.selectIdx = 0
t.failureCount = 0
}

// Pick a TSO server in a round-robin way.
tsoServerAddr := t.addrs[t.selectIdx]
t.selectIdx++
t.selectIdx %= len(t.addrs)

return tsoServerAddr, nil
}

func (t *tsoServerDiscovery) countFailure() {
t.Lock()
defer t.Unlock()
Expand Down Expand Up @@ -301,7 +278,7 @@ func (c *tsoServiceDiscovery) GetKeyspaceGroupID() uint32 {
}

// DiscoverServiceURLs discovers the microservice with the specified type and returns the server urls.
func (c *tsoServiceDiscovery) DiscoverMicroservice(svcType serviceType) []string {
func (c *tsoServiceDiscovery) DiscoverMicroservice(svcType serviceType) ([]string, error) {
var urls []string

switch svcType {
Expand All @@ -312,7 +289,7 @@ func (c *tsoServiceDiscovery) DiscoverMicroservice(svcType serviceType) []string
panic("invalid service type")
}

return urls
return urls, nil
}

// GetServiceURLs returns the URLs of the tso primary/secondary addresses of this keyspace group.
Expand Down Expand Up @@ -430,7 +407,7 @@ func (c *tsoServiceDiscovery) afterPrimarySwitched(oldPrimary, newPrimary string
func (c *tsoServiceDiscovery) updateMember() error {
// The keyspace membership or the primary serving address of the keyspace group, to which this
// keyspace belongs, might have been changed. We need to query tso servers to get the latest info.
tsoServerAddr, err := c.tsoServerDiscovery.getTSOServer(c.apiSvcDiscovery)
tsoServerAddr, err := c.getTSOServer(c.apiSvcDiscovery)
if err != nil {
log.Error("[tso] failed to get tso server", errs.ZapError(err))
return err
Expand Down Expand Up @@ -526,3 +503,84 @@ func (c *tsoServiceDiscovery) findGroupByKeyspaceID(

return resp.KeyspaceGroup, nil
}

func (c *tsoServiceDiscovery) getTSOServer(sd ServiceDiscovery) (string, error) {
c.Lock()
defer c.Unlock()

var (
addrs []string
err error
)
t := c.tsoServerDiscovery
if len(t.addrs) == 0 || t.failureCount == len(t.addrs) {
addrs, err = sd.DiscoverMicroservice(tsoService)
if err != nil {
return "", err
}
failpoint.Inject("serverReturnsNoTSOAddrs", func() {
log.Info("[failpoint] injected error: server returns no tso addrs")
addrs = nil
})
if len(addrs) == 0 {
// There is no error but no tso server address found, which means
// either the server side is experiencing some problems to get the
// tso primary addresses or the server hasn't been upgraded to the
// version which processes and returns GetClusterInfoResponse.TsoUrls.
// In this case, we fall back to the old way of discovering the tso
// primary addresses from etcd directly.
log.Warn("[tso] no tso server address found,"+
" fallback to the legacy path to discover from etcd directly",
zap.String("discovery-key", c.defaultDiscoveryKey))
addrs, err = c.discoverWithLegacyPath()
if err != nil {
return "", err
}
if len(addrs) == 0 {
return "", errors.New("no tso server address found")
}
}

log.Info("update tso server addresses", zap.Strings("addrs", addrs))

t.addrs = addrs
t.selectIdx = 0
t.failureCount = 0
}

// Pick a TSO server in a round-robin way.
tsoServerAddr := t.addrs[t.selectIdx]
t.selectIdx++
t.selectIdx %= len(t.addrs)

return tsoServerAddr, nil
}

func (c *tsoServiceDiscovery) discoverWithLegacyPath() ([]string, error) {
resp, err := c.metacli.Get(c.ctx, []byte(c.defaultDiscoveryKey))
if err != nil {
log.Error("[tso] failed to get the keyspace serving endpoint",
zap.String("discovery-key", c.defaultDiscoveryKey), errs.ZapError(err))
return nil, err
}
if resp == nil || len(resp.Kvs) == 0 {
log.Error("[tso] didn't find the keyspace serving endpoint",
zap.String("primary-key", c.defaultDiscoveryKey))
return nil, errs.ErrClientGetServingEndpoint
} else if resp.Count > 1 {
return nil, errs.ErrClientGetMultiResponse.FastGenByArgs(resp.Kvs)
}

value := resp.Kvs[0].Value
primary := &tsopb.Participant{}
if err := proto.Unmarshal(value, primary); err != nil {
return nil, errs.ErrClientProtoUnmarshal.Wrap(err).GenWithStackByCause()
}
listenUrls := primary.GetListenUrls()
if len(listenUrls) == 0 {
log.Error("[tso] the keyspace serving endpoint list is empty",
zap.String("discovery-key", c.defaultDiscoveryKey))
return nil, errs.ErrClientGetServingEndpoint
}
return listenUrls, nil
}
28 changes: 28 additions & 0 deletions tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,34 @@ func (suite *tsoClientTestSuite) TestGetTSAsync() {
wg.Wait()
}

func (suite *tsoClientTestSuite) TestDiscoverTSOServiceWithLegacyPath() {
re := suite.Require()
// Simulate the case that the server has lower version than the client and returns no tso addrs
// in the GetClusterInfo RPC.
re.NoError(failpoint.Enable("github.com/tikv/pd/client/serverReturnsNoTSOAddrs", `return(true)`))
defer func() {
re.NoError(failpoint.Disable("github.com/tikv/pd/client/serverReturnsNoTSOAddrs"))
}()
var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber)
for i := 0; i < tsoRequestConcurrencyNumber; i++ {
go func() {
defer wg.Done()
client := mcs.SetupClientWithDefaultKeyspaceName(
suite.ctx, re, strings.Split(suite.backendEndpoints, ","))
var lastTS uint64
for j := 0; j < tsoRequestRound; j++ {
physical, logical, err := client.GetTS(suite.ctx)
suite.NoError(err)
ts := tsoutil.ComposeTS(physical, logical)
suite.Less(lastTS, ts)
lastTS = ts
}
}()
}
wg.Wait()
}

// More details can be found in this issue: https://github.com/tikv/pd/issues/4884
func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() {
re := suite.Require()
Expand Down

0 comments on commit 9d47a9f

Please sign in to comment.