From 4ba4ee4ea33c348a551abf51d89823c8d8b05bab Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Sat, 9 Apr 2022 15:20:33 +0800 Subject: [PATCH] This is an automated cherry-pick of #4798 ref tikv/pd#4797 Signed-off-by: ti-chi-bot --- server/grpc_service.go | 94 ++++++++++++++++++++++++---- tests/server/cluster/cluster_test.go | 21 +++++++ 2 files changed, 102 insertions(+), 13 deletions(-) diff --git a/server/grpc_service.go b/server/grpc_service.go index 1bc9ebf3314..4a9c28e3141 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -45,19 +45,61 @@ import ( "google.golang.org/grpc/status" ) -// GrpcServer wraps Server to provide grpc service. -type GrpcServer struct { - *Server -} +const ( + heartbeatSendTimeout = 5 * time.Second + // store config + storeReadyWaitTime = 5 * time.Second + + // tso + maxMergeTSORequests = 10000 + defaultTSOProxyTimeout = 3 * time.Second + + // global config + globalConfigPath = "/global/config/" +) // gRPC errors var ( // ErrNotLeader is returned when current server is not the leader and not possible to process request. // TODO: work as proxy. +<<<<<<< HEAD ErrNotLeader = status.Errorf(codes.Unavailable, "not leader") ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") ) +======= + ErrNotLeader = status.Errorf(codes.Unavailable, "not leader") + ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") + ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout") +) + +// GrpcServer wraps Server to provide grpc service. +type GrpcServer struct { + *Server +} + +type forwardFn func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) + +func (s *GrpcServer) unaryMiddleware(ctx context.Context, header *pdpb.RequestHeader, fn forwardFn) (rsp interface{}, err error) { + failpoint.Inject("customTimeout", func() { + time.Sleep(5 * time.Second) + }) + forwardedHost := getForwardedHost(ctx) + if !s.isLocalRequest(forwardedHost) { + client, err := s.getDelegateClient(ctx, forwardedHost) + if err != nil { + return nil, err + } + ctx = grpcutil.ResetForwardContext(ctx) + return fn(ctx, client) + } + if err := s.validateRequest(header); err != nil { + return nil, err + } + return nil, nil +} + +>>>>>>> e3fd147fd (server: partially fix grpc status (#4798)) // GetMembers implements gRPC PDServer. func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb.GetMembersResponse, error) { // Here we purposely do not check the cluster ID because the client does not know the correct cluster ID @@ -99,11 +141,6 @@ func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb }, nil } -const ( - maxMergeTSORequests = 10000 - defaultTSOProxyTimeout = 3 * time.Second -) - // Tso implements gRPC PDServer. func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { var ( @@ -629,7 +666,36 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear return resp, nil } +<<<<<<< HEAD const regionHeartbeatSendTimeout = 5 * time.Second +======= +// bucketHeartbeatServer wraps PD_ReportBucketsServer to ensure when any error +// occurs on SendAndClose() or Recv(), both endpoints will be closed. +type bucketHeartbeatServer struct { + stream pdpb.PD_ReportBucketsServer + closed int32 +} + +func (b *bucketHeartbeatServer) Send(bucket *pdpb.ReportBucketsResponse) error { + if atomic.LoadInt32(&b.closed) == 1 { + return status.Errorf(codes.Canceled, "stream is closed") + } + done := make(chan error, 1) + go func() { + done <- b.stream.SendAndClose(bucket) + }() + select { + case err := <-done: + if err != nil { + atomic.StoreInt32(&b.closed, 1) + } + return err + case <-time.After(heartbeatSendTimeout): + atomic.StoreInt32(&b.closed, 1) + return ErrSendHeartbeatTimeout + } +} +>>>>>>> e3fd147fd (server: partially fix grpc status (#4798)) var errSendRegionHeartbeatTimeout = errors.New("send region heartbeat timeout") @@ -654,7 +720,11 @@ func (s *heartbeatServer) Send(m *pdpb.RegionHeartbeatResponse) error { return errors.WithStack(err) case <-time.After(regionHeartbeatSendTimeout): atomic.StoreInt32(&s.closed, 1) +<<<<<<< HEAD return errors.WithStack(errSendRegionHeartbeatTimeout) +======= + return ErrSendHeartbeatTimeout +>>>>>>> e3fd147fd (server: partially fix grpc status (#4798)) } } @@ -1418,7 +1488,7 @@ func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorR // TODO: Call it in gRPC interceptor. func (s *GrpcServer) validateRequest(header *pdpb.RequestHeader) error { if s.IsClosed() || !s.member.IsLeader() { - return errors.WithStack(ErrNotLeader) + return ErrNotLeader } if header.GetClusterId() != s.clusterID { return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, header.GetClusterId()) @@ -1603,7 +1673,7 @@ func (s *GrpcServer) GetDCLocationInfo(ctx context.Context, request *pdpb.GetDCL // the gRPC communication between PD servers internally. func (s *GrpcServer) validateInternalRequest(header *pdpb.RequestHeader, onlyAllowLeader bool) error { if s.IsClosed() { - return errors.WithStack(ErrNotStarted) + return ErrNotStarted } // If onlyAllowLeader is true, check whether the sender is PD leader. if onlyAllowLeader { @@ -1701,8 +1771,6 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan <-done } -const globalConfigPath = "/global/config/" - // StoreGlobalConfig store global config into etcd by transaction func (s *GrpcServer) StoreGlobalConfig(ctx context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) { ops := make([]clientv3.Op, len(request.Changes)) diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index a35245beae6..1ff1a82da5e 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -39,6 +39,8 @@ import ( syncer "github.com/tikv/pd/server/region_syncer" "github.com/tikv/pd/server/schedule/operator" "github.com/tikv/pd/tests" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func Test(t *testing.T) { @@ -408,6 +410,25 @@ func (s *clusterTestSuite) TestGetPDMembers(c *C) { c.Assert(resp.GetMembers(), Not(HasLen), 0) } +func (s *clusterTestSuite) TestNotLeader(c *C) { + tc, err := tests.NewTestCluster(s.ctx, 2) + defer tc.Destroy() + c.Assert(err, IsNil) + c.Assert(tc.RunInitialServers(), IsNil) + + tc.WaitLeader() + followerServer := tc.GetServer(tc.GetFollower()) + grpcPDClient := testutil.MustNewGrpcClient(c, followerServer.GetAddr()) + clusterID := followerServer.GetClusterID() + req := &pdpb.AllocIDRequest{Header: testutil.NewRequestHeader(clusterID)} + resp, err := grpcPDClient.AllocID(context.Background(), req) + c.Assert(resp, IsNil) + grpcStatus, ok := status.FromError(err) + c.Assert(ok, IsTrue) + c.Assert(grpcStatus.Code(), Equals, codes.Unavailable) + c.Assert(grpcStatus.Message(), Equals, "not leader") +} + func (s *clusterTestSuite) TestStoreVersionChange(c *C) { tc, err := tests.NewTestCluster(s.ctx, 1) defer tc.Destroy()