Skip to content

Commit

Permalink
support add ready for resp
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Aug 22, 2023
1 parent 346e771 commit 72419ac
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 19 deletions.
9 changes: 4 additions & 5 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,10 @@ func TestGRPCDialOption(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), 500*time.Millisecond)
defer cancel()
cli := &pdServiceDiscovery{
checkMembershipCh: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
tlsCfg: &tlsutil.TLSConfig{},
option: newOption(),
ctx: ctx,
cancel: cancel,
tlsCfg: &tlsutil.TLSConfig{},
option: newOption(),
}
cli.urls.Store([]string{testClientURL})
cli.option.gRPCDialOptions = []grpc.DialOption{grpc.WithBlock()}
Expand Down
103 changes: 90 additions & 13 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ import (
"github.com/tikv/pd/client/tlsutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)

const (
globalDCLocation = "global"
memberUpdateInterval = time.Minute
serviceModeUpdateInterval = 3 * time.Second
updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation.
requestTimeout = 2 * time.Second
)

type serviceType int
Expand All @@ -61,7 +63,7 @@ type ServiceDiscovery interface {
GetKeyspaceID() uint32
// GetKeyspaceGroupID returns the ID of the keyspace group
GetKeyspaceGroupID() uint32
// DiscoverServiceURLs discovers the microservice with the specified type and returns the server urls.
// DiscoverMicroservice discovers the microservice with the specified type and returns the server urls.
DiscoverMicroservice(svcType serviceType) ([]string, error)
// GetServiceURLs returns the URLs of the servers providing the service
GetServiceURLs() []string
Expand Down Expand Up @@ -141,8 +143,6 @@ type pdServiceDiscovery struct {
// leader is updated.
tsoGlobalAllocLeaderUpdatedCb tsoGlobalServAddrUpdatedFunc

checkMembershipCh chan struct{}

wg *sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
Expand All @@ -153,6 +153,8 @@ type pdServiceDiscovery struct {
tlsCfg *tlsutil.TLSConfig
// Client option.
option *option

successReConnect chan struct{}
}

// newPDServiceDiscovery returns a new PD service discovery-based client.
Expand All @@ -165,7 +167,6 @@ func newPDServiceDiscovery(
urls []string, tlsCfg *tlsutil.TLSConfig, option *option,
) *pdServiceDiscovery {
pdsd := &pdServiceDiscovery{
checkMembershipCh: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
wg: wg,
Expand Down Expand Up @@ -207,7 +208,7 @@ func (c *pdServiceDiscovery) Init() error {
}

c.wg.Add(2)
go c.updateMemberLoop()
go c.reconnectMemberLoop()
go c.updateServiceModeLoop()

c.isInitialized = true
Expand All @@ -231,30 +232,107 @@ func (c *pdServiceDiscovery) initRetry(f func() error) error {
return errors.WithStack(err)
}

func (c *pdServiceDiscovery) updateMemberLoop() {
func (c *pdServiceDiscovery) reconnectMemberLoop() {
defer c.wg.Done()

ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
ticker := time.NewTicker(memberUpdateInterval)
defer ticker.Stop()
failpoint.Inject("acceleratedMemberUpdateInterval", func() {
ticker.Stop()
ticker = time.NewTicker(time.Millisecond * 100)
})

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
case <-c.checkMembershipCh:
}

failpoint.Inject("skipUpdateMember", func() {
failpoint.Continue()
})

if err := c.updateMember(); err != nil {
log.Error("[pd] failed to update member", zap.Strings("urls", c.GetServiceURLs()), errs.ZapError(err))
log.Error("[pd] failed to update member", errs.ZapError(err))
} else {
c.SuccessReconnect()
}
}
}

func (c *pdServiceDiscovery) waitForReady() error {
if e1 := c.waitForLeaderReady(); e1 != nil {
log.Error("[pd.waitForReady] failed to wait for leader ready", errs.ZapError(e1))
return errors.WithStack(e1)
} else if e2 := c.loadMembers(); e2 != nil {
log.Error("[pd.waitForReady] failed to load members", errs.ZapError(e2))
} else {
return nil
}

deadline := time.Now().Add(requestTimeout)
for {
select {
case <-c.successReConnect:
return nil
case <-time.After(time.Until(deadline)):
log.Error("[pd.waitForReady] timeout")
return errors.New("wait for ready timeout")
}
}
}

// waitForLeaderReady waits for the leader to be ready.
func (c *pdServiceDiscovery) waitForLeaderReady() error {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
for {
old, ok := c.clientConns.Load(c.getLeaderAddr())
if !ok {
cancel()
return errors.New("no leader")
}
cc := old.(*grpc.ClientConn)

s := cc.GetState()
if s == connectivity.Ready {
cancel()
return nil
}
if !cc.WaitForStateChange(ctx, s) {
cancel()
// ctx got timeout or canceled.
return ctx.Err()
}
}
}

func (c *pdServiceDiscovery) loadMembers() error {
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()

members, err := c.getMembers(ctx, c.getLeaderAddr(), updateMemberTimeout)
if err != nil {
log.Error("[pd.loadMembers] failed to load members ", zap.String("url", c.getLeaderAddr()), errs.ZapError(err))
return errors.WithStack(err)
} else if members.GetHeader() == nil || members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0 {
err = errs.ErrClientGetLeader.FastGenByArgs("leader address don't exist")
log.Error("[pd.loadMembers] leader address don't exist. ", zap.String("url", c.getLeaderAddr()), errs.ZapError(err))
return errors.WithStack(err)
}

return nil
}

func (c *pdServiceDiscovery) SuccessReconnect() {
select {
case c.successReConnect <- struct{}{}:
default:
}
}

func (c *pdServiceDiscovery) updateServiceModeLoop() {
defer c.wg.Done()
failpoint.Inject("skipUpdateServiceMode", func() {
Expand Down Expand Up @@ -319,7 +397,7 @@ func (c *pdServiceDiscovery) GetKeyspaceGroupID() uint32 {
return defaultKeySpaceGroupID
}

// DiscoverServiceURLs discovers the microservice with the specified type and returns the server urls.
// DiscoverMicroservice discovers the microservice with the specified type and returns the server urls.
func (c *pdServiceDiscovery) DiscoverMicroservice(svcType serviceType) (urls []string, err error) {
switch svcType {
case apiService:
Expand Down Expand Up @@ -380,13 +458,12 @@ func (c *pdServiceDiscovery) GetBackupAddrs() []string {
// ScheduleCheckMemberChanged is used to check if there is any membership
// change among the leader and the followers.
func (c *pdServiceDiscovery) ScheduleCheckMemberChanged() {
select {
case c.checkMembershipCh <- struct{}{}:
default:
if err := c.waitForReady(); err != nil {
log.Error("[pd] failed to wait for ready", errs.ZapError(err))
}
}

// Immediately check if there is any membership change among the leader/followers in a
// CheckMemberChanged Immediately check if there is any membership change among the leader/followers in a
// quorum-based cluster or among the primary/secondaries in a primary/secondary configured cluster.
func (c *pdServiceDiscovery) CheckMemberChanged() error {
return c.updateMember()
Expand Down
2 changes: 1 addition & 1 deletion client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func (c *tsoServiceDiscovery) ScheduleCheckMemberChanged() {
}
}

// Immediately check if there is any membership change among the primary/secondaries in
// CheckMemberChanged Immediately check if there is any membership change among the primary/secondaries in
// a primary/secondary configured cluster.
func (c *tsoServiceDiscovery) CheckMemberChanged() error {
c.apiSvcDiscovery.CheckMemberChanged()
Expand Down
1 change: 1 addition & 0 deletions pkg/utils/testutil/leak.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ var LeakOptions = []goleak.Option{
goleak.IgnoreTopFunction("net/http.(*persistConn).writeLoop"),
// natefinch/lumberjack#56, It's a goroutine leak bug. Another ignore option PR https://github.com/pingcap/tidb/pull/27405/
goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"),
goleak.IgnoreTopFunction("google.golang.org/grpc.(*ClientConn).WaitForStateChange"),
}
6 changes: 6 additions & 0 deletions tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ func TestClientClusterIDCheck(t *testing.T) {

func TestClientLeaderChange(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval", `return(true)`))
defer failpoint.Disable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 3)
Expand Down Expand Up @@ -312,6 +314,8 @@ func TestTSOFollowerProxy(t *testing.T) {
// TestUnavailableTimeAfterLeaderIsReady is used to test https://github.com/tikv/pd/issues/5207
func TestUnavailableTimeAfterLeaderIsReady(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval", `return(true)`))
defer failpoint.Disable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 3)
Expand Down Expand Up @@ -375,6 +379,8 @@ func TestUnavailableTimeAfterLeaderIsReady(t *testing.T) {
// TODO: migrate the Local/Global TSO tests to TSO integration test folder.
func TestGlobalAndLocalTSO(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval", `return(true)`))
defer failpoint.Disable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dcLocationConfig := map[string]string{
Expand Down

0 comments on commit 72419ac

Please sign in to comment.