Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#6995
Browse files Browse the repository at this point in the history
ref tikv#6949

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
HuSharp authored and ti-chi-bot committed Aug 31, 2023
1 parent 4bcd2ab commit 6437dd8
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 8 deletions.
65 changes: 60 additions & 5 deletions client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,18 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/client/tlsutil"
"go.uber.org/zap"
"google.golang.org/grpc"
)

const (
globalDCLocation = "global"
memberUpdateInterval = time.Minute
serviceModeUpdateInterval = 3 * time.Second
updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation.
globalDCLocation = "global"
memberUpdateInterval = time.Minute
serviceModeUpdateInterval = 3 * time.Second
updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation.
updateMemberBackOffBaseTime = 100 * time.Millisecond
)

// ServiceDiscovery defines the general interface for service discovery on a quorum-based cluster
Expand Down Expand Up @@ -207,18 +209,25 @@ func (c *pdServiceDiscovery) updateMemberLoop() {
ticker := time.NewTicker(memberUpdateInterval)
defer ticker.Stop()

bo := retry.InitialBackOffer(updateMemberBackOffBaseTime, updateMemberTimeout)
for {
select {
case <-ctx.Done():
log.Info("[pd] exit member loop due to context canceled")
return
case <-ticker.C:
case <-c.checkMembershipCh:
}
failpoint.Inject("skipUpdateMember", func() {
failpoint.Continue()
})
<<<<<<< HEAD
if err := c.updateMember(); err != nil {
log.Error("[pd] failed to update member", zap.Strings("urls", c.GetURLs()), errs.ZapError(err))
=======
if err := bo.Exec(ctx, c.updateMember); err != nil {
log.Error("[pd] failed to update member", zap.Strings("urls", c.GetServiceURLs()), errs.ZapError(err))
>>>>>>> 9a574ed56 (client: add backoff for `member loop` (#6995))
}
}
}
Expand Down Expand Up @@ -263,7 +272,53 @@ func (c *pdServiceDiscovery) GetClusterID() uint64 {
return c.clusterID
}

<<<<<<< HEAD
// GetURLs returns the URLs of the servers.
=======
// GetKeyspaceID returns the ID of the keyspace
func (c *pdServiceDiscovery) GetKeyspaceID() uint32 {
return c.keyspaceID
}

// SetKeyspaceID sets the ID of the keyspace
func (c *pdServiceDiscovery) SetKeyspaceID(keyspaceID uint32) {
c.keyspaceID = keyspaceID
}

// GetKeyspaceGroupID returns the ID of the keyspace group
func (c *pdServiceDiscovery) GetKeyspaceGroupID() uint32 {
// PD/API service only supports the default keyspace group
return defaultKeySpaceGroupID
}

// DiscoverMicroservice discovers the microservice with the specified type and returns the server urls.
func (c *pdServiceDiscovery) DiscoverMicroservice(svcType serviceType) (urls []string, err error) {
switch svcType {
case apiService:
urls = c.GetServiceURLs()
case tsoService:
leaderAddr := c.getLeaderAddr()
if len(leaderAddr) > 0 {
clusterInfo, err := c.getClusterInfo(c.ctx, leaderAddr, c.option.timeout)
if err != nil {
log.Error("[pd] failed to get cluster info",
zap.String("leader-addr", leaderAddr), errs.ZapError(err))
return nil, err
}
urls = clusterInfo.TsoUrls
} else {
err = errors.New("failed to get leader addr")
return nil, err
}
default:
panic("invalid service type")
}

return urls, nil
}

// GetServiceURLs returns the URLs of the servers.
>>>>>>> 9a574ed56 (client: add backoff for `member loop` (#6995))
// For testing use. It should only be called when the client is closed.
func (c *pdServiceDiscovery) GetURLs() []string {
return c.urls.Load().([]string)
Expand Down Expand Up @@ -304,7 +359,7 @@ func (c *pdServiceDiscovery) ScheduleCheckMemberChanged() {
}
}

// Immediately check if there is any membership change among the leader/followers in a
// CheckMemberChanged Immediately check if there is any membership change among the leader/followers in a
// quorum-based cluster or among the primary/secondaries in a primary/secondary configured cluster.
func (c *pdServiceDiscovery) CheckMemberChanged() error {
return c.updateMember()
Expand Down
86 changes: 86 additions & 0 deletions client/retry/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package retry

import (
"context"
"time"

"github.com/pingcap/failpoint"
)

// BackOffer is a backoff policy for retrying operations.
type BackOffer struct {
max time.Duration
next time.Duration
base time.Duration
}

// Exec is a helper function to exec backoff.
func (bo *BackOffer) Exec(
ctx context.Context,
fn func() error,
) error {
if err := fn(); err != nil {
select {
case <-ctx.Done():
case <-time.After(bo.nextInterval()):
failpoint.Inject("backOffExecute", func() {
testBackOffExecuteFlag = true
})
}
return err
}
// reset backoff when fn() succeed.
bo.resetBackoff()
return nil
}

// InitialBackOffer make the initial state for retrying.
func InitialBackOffer(base, max time.Duration) BackOffer {
return BackOffer{
max: max,
base: base,
next: base,
}
}

// nextInterval for now use the `exponentialInterval`.
func (bo *BackOffer) nextInterval() time.Duration {
return bo.exponentialInterval()
}

// exponentialInterval returns the exponential backoff duration.
func (bo *BackOffer) exponentialInterval() time.Duration {
backoffInterval := bo.next
bo.next *= 2
if bo.next > bo.max {
bo.next = bo.max
}
return backoffInterval
}

// resetBackoff resets the backoff to initial state.
func (bo *BackOffer) resetBackoff() {
bo.next = bo.base
}

// Only used for test.
var testBackOffExecuteFlag = false

// TestBackOffExecute Only used for test.
func TestBackOffExecute() bool {
return testBackOffExecuteFlag
}
47 changes: 47 additions & 0 deletions client/retry/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package retry

import (
"context"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/stretchr/testify/require"
)

func TestExponentialBackoff(t *testing.T) {
re := require.New(t)

baseBackoff := 100 * time.Millisecond
maxBackoff := 1 * time.Second

backoff := InitialBackOffer(baseBackoff, maxBackoff)
re.Equal(backoff.nextInterval(), baseBackoff)
re.Equal(backoff.nextInterval(), 2*baseBackoff)

for i := 0; i < 10; i++ {
re.LessOrEqual(backoff.nextInterval(), maxBackoff)
}
re.Equal(backoff.nextInterval(), maxBackoff)

// Reset backoff
backoff.resetBackoff()
err := backoff.Exec(context.Background(), func() error {
return errors.New("test")
})
re.Error(err)
}
13 changes: 12 additions & 1 deletion client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
<<<<<<< HEAD
=======
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/client/timerpool"
"github.com/tikv/pd/client/tsoutil"
>>>>>>> 9a574ed56 (client: add backoff for `member loop` (#6995))
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -361,6 +367,11 @@ func (c *tsoClient) handleDispatcher(

// Loop through each batch of TSO requests and send them for processing.
streamLoopTimer := time.NewTimer(c.option.timeout)
<<<<<<< HEAD
=======
defer streamLoopTimer.Stop()
bo := retry.InitialBackOffer(updateMemberBackOffBaseTime, updateMemberTimeout)
>>>>>>> 9a574ed56 (client: add backoff for `member loop` (#6995))
tsoBatchLoop:
for {
select {
Expand Down Expand Up @@ -457,7 +468,7 @@ tsoBatchLoop:
stream = nil
// Because ScheduleCheckMemberChanged is asynchronous, if the leader changes, we better call `updateMember` ASAP.
if IsLeaderChange(err) {
if err := c.svcDiscovery.CheckMemberChanged(); err != nil {
if err := bo.Exec(dispatcherCtx, c.svcDiscovery.CheckMemberChanged); err != nil {
select {
case <-dispatcherCtx.Done():
return
Expand Down
2 changes: 1 addition & 1 deletion client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (c *tsoServiceDiscovery) ScheduleCheckMemberChanged() {
}
}

// Immediately check if there is any membership change among the primary/secondaries in
// CheckMemberChanged Immediately check if there is any membership change among the primary/secondaries in
// a primary/secondary configured cluster.
func (c *tsoServiceDiscovery) CheckMemberChanged() error {
return c.updateMember()
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1475,7 +1475,7 @@ func (s *Server) leaderLoop() {
if s.member.GetLeader() == nil {
lastUpdated := s.member.GetLastLeaderUpdatedTime()
// use random timeout to avoid leader campaigning storm.
randomTimeout := time.Duration(rand.Intn(int(lostPDLeaderMaxTimeoutSecs)))*time.Second + lostPDLeaderMaxTimeoutSecs*time.Second + lostPDLeaderReElectionFactor*s.cfg.ElectionInterval.Duration
randomTimeout := time.Duration(rand.Intn(lostPDLeaderMaxTimeoutSecs))*time.Second + lostPDLeaderMaxTimeoutSecs*time.Second + lostPDLeaderReElectionFactor*s.cfg.ElectionInterval.Duration
// add failpoint to test the campaign leader logic.
failpoint.Inject("timeoutWaitPDLeader", func() {
log.Info("timeoutWaitPDLeader is injected, skip wait other etcd leader be etcd leader")
Expand Down
49 changes: 49 additions & 0 deletions tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mock/mockid"
Expand Down Expand Up @@ -1506,3 +1507,51 @@ func TestClientWatchWithRevision(t *testing.T) {
}
}
}

func (suite *clientTestSuite) TestMemberUpdateBackOff() {
re := suite.Require()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 3)
re.NoError(err)
defer cluster.Destroy()

endpoints := runServer(re, cluster)
cli := setupCli(re, ctx, endpoints)
defer cli.Close()
innerCli, ok := cli.(interface{ GetServiceDiscovery() pd.ServiceDiscovery })
re.True(ok)

leader := cluster.GetLeader()
waitLeader(re, innerCli.GetServiceDiscovery(), cluster.GetServer(leader).GetConfig().ClientUrls)
memberID := cluster.GetServer(leader).GetLeader().GetMemberId()

re.NoError(failpoint.Enable("github.com/tikv/pd/server/leaderLoopCheckAgain", fmt.Sprintf("return(\"%d\")", memberID)))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/exitCampaignLeader", fmt.Sprintf("return(\"%d\")", memberID)))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/timeoutWaitPDLeader", `return(true)`))
// make sure back off executed.
re.NoError(failpoint.Enable("github.com/tikv/pd/client/retry/backOffExecute", `return(true)`))
leader2 := waitLeaderChange(re, cluster, leader, innerCli.GetServiceDiscovery())
re.True(retry.TestBackOffExecute())

re.NotEqual(leader, leader2)

re.NoError(failpoint.Disable("github.com/tikv/pd/server/leaderLoopCheckAgain"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/exitCampaignLeader"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/timeoutWaitPDLeader"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/retry/backOffExecute"))
}

func waitLeaderChange(re *require.Assertions, cluster *tests.TestCluster, old string, cli pd.ServiceDiscovery) string {
var leader string
testutil.Eventually(re, func() bool {
cli.ScheduleCheckMemberChanged()
leader = cluster.GetLeader()
if leader == old || leader == "" {
return false
}
return true
})
return leader
}

0 comments on commit 6437dd8

Please sign in to comment.