Skip to content

Commit

Permalink
client: revise client if mismatch leader id (#3432)
Browse files Browse the repository at this point in the history
* revise client

Signed-off-by: Song Gao <[email protected]>

* revise client

Signed-off-by: Song Gao <[email protected]>

* address the comment

Signed-off-by: Song Gao <[email protected]>

* add comment

Signed-off-by: Song Gao <[email protected]>

* address the comment

Signed-off-by: Song Gao <[email protected]>

* address the comment

Signed-off-by: Song Gao <[email protected]>

* address the comment

Signed-off-by: Song Gao <[email protected]>

* address the comment

Signed-off-by: Song Gao <[email protected]>
  • Loading branch information
Yisaer authored Feb 23, 2021
1 parent d28e248 commit ba729c9
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 7 deletions.
33 changes: 27 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,12 +347,13 @@ func (c *client) createTSODispatcher(dcLocation string) {

func (c *client) handleDispatcher(loopCtx context.Context, dc string, tsoDispatcher chan *tsoRequest) {
var (
err error
ctx context.Context
cancel context.CancelFunc
stream pdpb.PD_TsoClient
opts []opentracing.StartSpanOption
requests = make([]*tsoRequest, maxMergeTSORequests+1)
err error
ctx context.Context
cancel context.CancelFunc
stream pdpb.PD_TsoClient
opts []opentracing.StartSpanOption
requests = make([]*tsoRequest, maxMergeTSORequests+1)
needUpdate = false
)
defer func() {
if cancel != nil {
Expand All @@ -363,6 +364,19 @@ func (c *client) handleDispatcher(loopCtx context.Context, dc string, tsoDispatc
// If the tso stream for the corresponding dc-location has not been created yet or needs to be re-created,
// we will try to create the stream first.
if stream == nil {
if needUpdate {
err = c.updateLeader()
if err != nil {
select {
case <-loopCtx.Done():
return
default:
}
log.Error("[pd] failed updateLeader", errs.ZapError(err))
continue
}
needUpdate = false
}
ctx, cancel = context.WithCancel(loopCtx)
done := make(chan struct{})
go c.checkStreamTimeout(ctx, cancel, done)
Expand Down Expand Up @@ -427,6 +441,9 @@ func (c *client) handleDispatcher(loopCtx context.Context, dc string, tsoDispatc
c.ScheduleCheckLeader()
cancel()
stream = nil
if isMismatchLeader(err) {
needUpdate = true
}
}
}
}
Expand Down Expand Up @@ -1066,3 +1083,7 @@ func addrsToUrls(addrs []string) []string {
}
return urls
}

func isMismatchLeader(err error) bool {
return strings.Contains(err.Error(), errs.MismatchLeaderErr)
}
3 changes: 3 additions & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ package errs

import "github.com/pingcap/errors"

// MismatchLeaderErr indicates the the non-leader member received the requests which should be received by leader
const MismatchLeaderErr = "mismatch leader id"

// common error in multiple packages
var (
ErrGetSourceStore = errors.Normalize("failed to get the source store", errors.RFCCodeText("PD:common:ErrGetSourceStore"))
Expand Down
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1082,7 +1082,7 @@ func (s *Server) validateInternalRequest(header *pdpb.RequestHeader, onlyAllowLe
if onlyAllowLeader {
leaderID := s.GetLeader().GetMemberId()
if leaderID != header.GetSenderId() {
return status.Errorf(codes.FailedPrecondition, "mismatch leader id, need %d but got %d", leaderID, header.GetSenderId())
return status.Errorf(codes.FailedPrecondition, "%s, need %d but got %d", errs.MismatchLeaderErr, leaderID, header.GetSenderId())
}
}
return nil
Expand Down
11 changes: 11 additions & 0 deletions tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"math"
"path"
"sort"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -331,6 +332,16 @@ func (s *clientTestSuite) TestGlobalAndLocalTSO(c *C) {
c.Assert(p, Equals, int64(0))
c.Assert(l, Equals, int64(0))
c.Assert(err, NotNil)

// assert global tso after resign leader
err = cluster.ResignLeader()
c.Assert(err, IsNil)
cluster.WaitLeader()
_, _, err = cli.GetTS(s.ctx)
c.Assert(err, NotNil)
c.Assert(strings.Contains(err.Error(), "mismatch leader id"), Equals, true)
_, _, err = cli.GetTS(s.ctx)
c.Assert(err, IsNil)
}

func (s *clientTestSuite) TestCustomTimeout(c *C) {
Expand Down

0 comments on commit ba729c9

Please sign in to comment.