diff --git a/client/client.go b/client/client.go index 23b5b85810d..8563a3c6d9f 100644 --- a/client/client.go +++ b/client/client.go @@ -347,12 +347,13 @@ func (c *client) createTSODispatcher(dcLocation string) { func (c *client) handleDispatcher(loopCtx context.Context, dc string, tsoDispatcher chan *tsoRequest) { var ( - err error - ctx context.Context - cancel context.CancelFunc - stream pdpb.PD_TsoClient - opts []opentracing.StartSpanOption - requests = make([]*tsoRequest, maxMergeTSORequests+1) + err error + ctx context.Context + cancel context.CancelFunc + stream pdpb.PD_TsoClient + opts []opentracing.StartSpanOption + requests = make([]*tsoRequest, maxMergeTSORequests+1) + needUpdate = false ) defer func() { if cancel != nil { @@ -363,6 +364,19 @@ func (c *client) handleDispatcher(loopCtx context.Context, dc string, tsoDispatc // If the tso stream for the corresponding dc-location has not been created yet or needs to be re-created, // we will try to create the stream first. if stream == nil { + if needUpdate { + err = c.updateLeader() + if err != nil { + select { + case <-loopCtx.Done(): + return + default: + } + log.Error("[pd] failed updateLeader", errs.ZapError(err)) + continue + } + needUpdate = false + } ctx, cancel = context.WithCancel(loopCtx) done := make(chan struct{}) go c.checkStreamTimeout(ctx, cancel, done) @@ -427,6 +441,9 @@ func (c *client) handleDispatcher(loopCtx context.Context, dc string, tsoDispatc c.ScheduleCheckLeader() cancel() stream = nil + if isMismatchLeader(err) { + needUpdate = true + } } } } @@ -1066,3 +1083,7 @@ func addrsToUrls(addrs []string) []string { } return urls } + +func isMismatchLeader(err error) bool { + return strings.Contains(err.Error(), errs.MismatchLeaderErr) +} diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index cfe755d4319..6c00d0d3b7b 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -15,6 +15,9 @@ package errs import "github.com/pingcap/errors" +// MismatchLeaderErr indicates the the non-leader member received the requests which should be received by leader +const MismatchLeaderErr = "mismatch leader id" + // common error in multiple packages var ( ErrGetSourceStore = errors.Normalize("failed to get the source store", errors.RFCCodeText("PD:common:ErrGetSourceStore")) diff --git a/server/grpc_service.go b/server/grpc_service.go index c3d28369928..cbba6bd26ff 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1082,7 +1082,7 @@ func (s *Server) validateInternalRequest(header *pdpb.RequestHeader, onlyAllowLe if onlyAllowLeader { leaderID := s.GetLeader().GetMemberId() if leaderID != header.GetSenderId() { - return status.Errorf(codes.FailedPrecondition, "mismatch leader id, need %d but got %d", leaderID, header.GetSenderId()) + return status.Errorf(codes.FailedPrecondition, "%s, need %d but got %d", errs.MismatchLeaderErr, leaderID, header.GetSenderId()) } } return nil diff --git a/tests/client/client_test.go b/tests/client/client_test.go index ac16edf869d..fccd314b410 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -21,6 +21,7 @@ import ( "math" "path" "sort" + "strings" "sync" "testing" "time" @@ -331,6 +332,16 @@ func (s *clientTestSuite) TestGlobalAndLocalTSO(c *C) { c.Assert(p, Equals, int64(0)) c.Assert(l, Equals, int64(0)) c.Assert(err, NotNil) + + // assert global tso after resign leader + err = cluster.ResignLeader() + c.Assert(err, IsNil) + cluster.WaitLeader() + _, _, err = cli.GetTS(s.ctx) + c.Assert(err, NotNil) + c.Assert(strings.Contains(err.Error(), "mismatch leader id"), Equals, true) + _, _, err = cli.GetTS(s.ctx) + c.Assert(err, IsNil) } func (s *clientTestSuite) TestCustomTimeout(c *C) {