diff --git a/server/grpc_service.go b/server/grpc_service.go index 8dee8cc9f464..c4c79fb025aa 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -68,6 +68,7 @@ var ( ErrForwardTSOTimeout = status.Errorf(codes.DeadlineExceeded, "forward tso request timeout") ErrMaxCountTSOProxyRoutinesExceeded = status.Errorf(codes.ResourceExhausted, "max count of concurrent tso proxy routines exceeded") ErrTSOProxyRecvFromClientTimeout = status.Errorf(codes.DeadlineExceeded, "tso proxy timeout when receiving from client; stream closed by server") + ErrEtcdClientRequestFailed = status.Errorf(codes.Internal, "etcd client request failed") ) // GrpcServer wraps Server to provide grpc service. @@ -275,7 +276,7 @@ func (s *GrpcServer) getMinTSFromSingleServer( } // GetMembers implements gRPC PDServer. -func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb.GetMembersResponse, error) { +func (s *GrpcServer) GetMembers(ctx context.Context, _ *pdpb.GetMembersRequest) (*pdpb.GetMembersResponse, error) { // Here we purposely do not check the cluster ID because the client does not know the correct cluster ID // at startup and needs to get the cluster ID with the first request (i.e. GetMembers). if s.IsClosed() { @@ -283,7 +284,7 @@ func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, errs.ErrServerNotStarted.FastGenByArgs().Error()), }, nil } - members, err := cluster.GetMembers(s.GetClient()) + members, err := cluster.GetMembers(s.GetEtcdClientWithRetry(ctx)) if err != nil { return &pdpb.GetMembersResponse{ Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), @@ -2281,9 +2282,9 @@ const globalConfigPath = "/global/config/" // StoreGlobalConfig store global config into etcd by transaction // Since item value needs to support marshal of different struct types, // it should be set to `Payload bytes` instead of `Value string` -func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) { - if s.client == nil { - return nil, errors.New("failed to store global config, etcd client not found") +func (s *GrpcServer) StoreGlobalConfig(ctx context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) { + if s.GetEtcdClientWithRetry(ctx) == nil { + return &pdpb.StoreGlobalConfigResponse{}, ErrEtcdClientRequestFailed } configPath := request.GetConfigPath() if configPath == "" { @@ -2319,8 +2320,8 @@ func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlo // - `Names` iteratively get value from `ConfigPath/Name` but not care about revision // - `ConfigPath` if `Names` is nil can get all values and revision of current path func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlobalConfigRequest) (*pdpb.LoadGlobalConfigResponse, error) { - if s.client == nil { - return nil, errors.New("failed to load global config, etcd client not found") + if s.GetEtcdClientWithRetry(ctx) == nil { + return &pdpb.LoadGlobalConfigResponse{}, ErrEtcdClientRequestFailed } configPath := request.GetConfigPath() if configPath == "" { @@ -2358,11 +2359,12 @@ func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlo // by Etcd.Watch() as long as the context has not been canceled or timed out. // Watch on revision which greater than or equal to the required revision. func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error { - if s.client == nil { - return errors.Errorf("failed to watch global config, etcd client not found") - } ctx, cancel := context.WithCancel(s.Context()) defer cancel() + if s.GetEtcdClientWithRetry(ctx) == nil { + return ErrEtcdClientRequestFailed + } + println("WatchGlobalConfig") configPath := req.GetConfigPath() if configPath == "" { configPath = globalConfigPath diff --git a/server/server.go b/server/server.go index 5bf26f3c58d7..5274b80bcba4 100644 --- a/server/server.go +++ b/server/server.go @@ -786,6 +786,22 @@ func (s *Server) GetClient() *clientv3.Client { return s.client } +func (s *Server) GetEtcdClientWithRetry(ctx context.Context) *clientv3.Client { + newCtx, cancel := context.WithTimeout(ctx, EtcdStartTimeout) + defer cancel() + + ticker := time.NewTicker(retryIntervalGetServicePrimary) + defer ticker.Stop() + for s.client == nil { + select { + case <-newCtx.Done(): // timeout + return nil + case <-ticker.C: + } + } + return s.client +} + // GetHTTPClient returns builtin http client. func (s *Server) GetHTTPClient() *http.Client { return s.httpClient diff --git a/tests/integrations/client/global_config_test.go b/tests/integrations/client/global_config_test.go index 15034d035a69..cf00d34f0d73 100644 --- a/tests/integrations/client/global_config_test.go +++ b/tests/integrations/client/global_config_test.go @@ -335,20 +335,28 @@ func (suite *globalConfigTestSuite) TestEtcdNotStart() { suite.mu.Lock() suite.server.SetClient(nil) suite.mu.Unlock() - err := suite.server.WatchGlobalConfig(&pdpb.WatchGlobalConfigRequest{ + + go func() { + time.Sleep(100 * time.Millisecond) + suite.mu.Lock() + suite.server.SetClient(cli) + suite.mu.Unlock() + }() + + server := testReceiver{re: suite.Require()} + go suite.server.WatchGlobalConfig(&pdpb.WatchGlobalConfigRequest{ ConfigPath: globalConfigPath, Revision: 0, - }, nil) - suite.Error(err) + }, server) - _, err = suite.server.StoreGlobalConfig(suite.server.Context(), &pdpb.StoreGlobalConfigRequest{ + _, err := suite.server.StoreGlobalConfig(suite.server.Context(), &pdpb.StoreGlobalConfigRequest{ ConfigPath: globalConfigPath, Changes: []*pdpb.GlobalConfigItem{{Kind: pdpb.EventType_PUT, Name: "0", Payload: []byte("0")}}, }) - suite.Error(err) + suite.NoError(err) _, err = suite.server.LoadGlobalConfig(suite.server.Context(), &pdpb.LoadGlobalConfigRequest{ Names: []string{"test_etcd"}, }) - suite.Error(err) + suite.NoError(err) }