Skip to content

Commit

Permalink
client: fix Stream timeout logic (tikv#5551) (tikv#5584)
Browse files Browse the repository at this point in the history
close tikv#5207, ref tikv#5551

fix Stream timeout logic

Signed-off-by: ti-chi-bot <[email protected]>
Signed-off-by: Cabinfever_B <[email protected]>

Co-authored-by: Yongbo Jiang <[email protected]>
Co-authored-by: Cabinfever_B <[email protected]>
  • Loading branch information
ti-chi-bot and CabinfeverB authored Oct 21, 2022
1 parent b599fad commit cb1af02
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 21 deletions.
47 changes: 26 additions & 21 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,8 @@ const (
updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation.
tsLoopDCCheckInterval = time.Minute
defaultMaxTSOBatchSize = 10000 // should be higher if client is sending requests in burst
retryInterval = 1 * time.Second
maxRetryTimes = 5
retryInterval = 500 * time.Millisecond
maxRetryTimes = 6
)

// LeaderHealthCheckInterval might be changed in the unit to shorten the testing time.
Expand Down Expand Up @@ -678,12 +678,11 @@ func (c *client) handleDispatcher(
dc string,
tbc *tsoBatchController) {
var (
retryTimeConsuming time.Duration
err error
streamAddr string
stream pdpb.PD_TsoClient
streamCtx context.Context
cancel context.CancelFunc
err error
streamAddr string
stream pdpb.PD_TsoClient
streamCtx context.Context
cancel context.CancelFunc
// addr -> connectionContext
connectionCtxs sync.Map
opts []opentracing.StartSpanOption
Expand Down Expand Up @@ -740,6 +739,7 @@ func (c *client) handleDispatcher(
}

// Loop through each batch of TSO requests and send them for processing.
streamLoopTimer := time.NewTimer(c.option.timeout)
tsoBatchLoop:
for {
select {
Expand All @@ -756,6 +756,7 @@ tsoBatchLoop:
if maxBatchWaitInterval >= 0 {
tbc.adjustBestBatchSize()
}
streamLoopTimer.Reset(c.option.timeout)
// Choose a stream to send the TSO gRPC request.
streamChoosingLoop:
for {
Expand All @@ -766,24 +767,22 @@ tsoBatchLoop:
// Check stream and retry if necessary.
if stream == nil {
log.Info("[pd] tso stream is not ready", zap.String("dc", dc))
c.updateConnectionCtxs(dispatcherCtx, dc, &connectionCtxs)
if retryTimeConsuming >= c.option.timeout {
if c.updateConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) {
continue streamChoosingLoop
}
select {
case <-dispatcherCtx.Done():
return
case <-streamLoopTimer.C:
err = errs.ErrClientCreateTSOStream.FastGenByArgs()
log.Error("[pd] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err))
c.ScheduleCheckLeader()
c.finishTSORequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err))
retryTimeConsuming = 0
continue tsoBatchLoop
}
select {
case <-dispatcherCtx.Done():
return
case <-time.After(time.Second):
retryTimeConsuming += time.Second
continue
case <-time.After(retryInterval):
continue streamChoosingLoop
}
}
retryTimeConsuming = 0
select {
case <-streamCtx.Done():
log.Info("[pd] tso stream is canceled", zap.String("dc", dc), zap.String("stream-addr", streamAddr))
Expand Down Expand Up @@ -877,15 +876,17 @@ type connectionContext struct {
cancel context.CancelFunc
}

func (c *client) updateConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) {
func (c *client) updateConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) bool {
// Normal connection creating, it will be affected by the `enableForwarding`.
createTSOConnection := c.tryConnect
if c.allowTSOFollowerProxy(dc) {
createTSOConnection = c.tryConnectWithProxy
}
if err := createTSOConnection(updaterCtx, dc, connectionCtxs); err != nil {
log.Error("[pd] update connection contexts failed", zap.String("dc", dc), errs.ZapError(err))
return false
}
return true
}

// tryConnect will try to connect to the TSO allocator leader. If the connection becomes unreachable
Expand All @@ -901,6 +902,8 @@ func (c *client) tryConnect(
networkErrNum uint64
err error
stream pdpb.PD_TsoClient
url string
cc *grpc.ClientConn
)
updateAndClear := func(newAddr string, connectionCtx *connectionContext) {
if cc, loaded := connectionCtxs.LoadOrStore(newAddr, connectionCtx); loaded {
Expand All @@ -916,9 +919,11 @@ func (c *client) tryConnect(
return true
})
}
cc, url := c.getAllocatorClientConnByDCLocation(dc)
// retry several times before falling back to the follower when the network problem happens

for i := 0; i < maxRetryTimes; i++ {
c.ScheduleCheckLeader()
cc, url = c.getAllocatorClientConnByDCLocation(dc)
cctx, cancel := context.WithCancel(dispatcherCtx)
stream, err = c.createTsoStream(cctx, cancel, pdpb.NewPDClient(cc))
failpoint.Inject("unreachableNetwork", func() {
Expand Down
60 changes: 60 additions & 0 deletions tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,66 @@ func (s *clientTestSuite) TestTSOFollowerProxy(c *C) {
wg.Wait()
}

// TestUnavailableTimeAfterLeaderIsReady is used to test https://github.com/tikv/pd/issues/5207
func (s *clientTestSuite) TestUnavailableTimeAfterLeaderIsReady(c *C) {
cluster, err := tests.NewTestCluster(s.ctx, 3)
c.Assert(err, IsNil)
defer cluster.Destroy()

endpoints := s.runServer(c, cluster)
cli := setupCli(c, s.ctx, endpoints)

var wg sync.WaitGroup
var maxUnavailableTime, leaderReadyTime time.Time
getTsoFunc := func() {
defer wg.Done()
var lastTS uint64
for i := 0; i < tsoRequestRound; i++ {
var physical, logical int64
var ts uint64
physical, logical, err = cli.GetTS(context.Background())
ts = tsoutil.ComposeTS(physical, logical)
if err != nil {
maxUnavailableTime = time.Now()
continue
}
c.Assert(err, IsNil)
c.Assert(lastTS, Less, ts)
lastTS = ts
}
}

// test resign pd leader or stop pd leader
wg.Add(1 + 1)
go getTsoFunc()
go func() {
defer wg.Done()
leader := cluster.GetServer(cluster.GetLeader())
leader.Stop()
cluster.WaitLeader()
leaderReadyTime = time.Now()
cluster.RunServers([]*tests.TestServer{leader})
}()
wg.Wait()
c.Assert(maxUnavailableTime.Nanosecond(), Less, leaderReadyTime.Add(1*time.Second).Nanosecond())

// test kill pd leader pod or network of leader is unreachable
wg.Add(1 + 1)
maxUnavailableTime, leaderReadyTime = time.Time{}, time.Time{}
go getTsoFunc()
go func() {
defer wg.Done()
leader := cluster.GetServer(cluster.GetLeader())
c.Assert(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)"), IsNil)
leader.Stop()
cluster.WaitLeader()
c.Assert(failpoint.Disable("github.com/tikv/pd/client/unreachableNetwork"), IsNil)
leaderReadyTime = time.Now()
}()
wg.Wait()
c.Assert(maxUnavailableTime.Nanosecond(), Less, leaderReadyTime.Add(1*time.Second).Nanosecond())
}

func (s *clientTestSuite) TestGlobalAndLocalTSO(c *C) {
dcLocationConfig := map[string]string{
"pd1": "dc-1",
Expand Down

0 comments on commit cb1af02

Please sign in to comment.