diff --git a/client/base_client.go b/client/base_client.go index 2b956cf14b6..aa48c7f5a9f 100644 --- a/client/base_client.go +++ b/client/base_client.go @@ -143,7 +143,7 @@ func (c *baseClient) memberLoop() { failpoint.Continue() }) if err := c.updateMember(); err != nil { - log.Error("[pd] failed updateMember", errs.ZapError(err)) + log.Error("[pd] failed to update member", errs.ZapError(err)) } } } @@ -303,7 +303,7 @@ func (c *baseClient) updateMember() error { errTSO = c.switchTSOAllocatorLeader(members.GetTsoAllocatorLeaders()) } - // Failed to get PD leader + // Failed to get members if err != nil { log.Info("[pd] cannot update member from this address", zap.String("address", u), @@ -327,7 +327,7 @@ func (c *baseClient) updateMember() error { // the error of `switchTSOAllocatorLeader` will be returned. return errTSO } - return errs.ErrClientGetLeader.FastGenByArgs(c.GetURLs()) + return errs.ErrClientGetMember.FastGenByArgs(c.GetURLs()) } func (c *baseClient) getMembers(ctx context.Context, url string, timeout time.Duration) (*pdpb.GetMembersResponse, error) { diff --git a/pkg/mcs/resource_manager/server/server.go b/pkg/mcs/resource_manager/server/server.go index 3004f1a08e8..e56e38e3c11 100644 --- a/pkg/mcs/resource_manager/server/server.go +++ b/pkg/mcs/resource_manager/server/server.go @@ -151,7 +151,6 @@ func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { } func (s *Server) initClient() error { - // TODO: We need to keep all backend endpoints and keep updating them to the latest. Once one of them failed, need to try another one. tlsConfig, err := s.cfg.Security.ToTLSConfig() if err != nil { return err diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 703b6e9033f..b055f79e75c 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -17,21 +17,18 @@ package etcdutil import ( "context" "crypto/tls" - "fmt" "math/rand" "net/http" "net/url" - "testing" "time" "github.com/gogo/protobuf/proto" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/typeutil" "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/embed" "go.etcd.io/etcd/etcdserver" "go.etcd.io/etcd/pkg/types" "go.uber.org/zap" @@ -41,6 +38,9 @@ const ( // defaultEtcdClientTimeout is the default timeout for etcd client. defaultEtcdClientTimeout = 3 * time.Second + // defaultAutoSyncInterval is the interval to sync etcd cluster. + defaultAutoSyncInterval = 60 * time.Second + // DefaultDialTimeout is the maximum amount of time a dial will wait for a // connection to setup. 30s is long enough for most of the network conditions. DefaultDialTimeout = 30 * time.Second @@ -186,53 +186,48 @@ func EtcdKVPutWithTTL(ctx context.Context, c *clientv3.Client, key string, value return kv.Put(ctx, key, value, clientv3.WithLease(grantResp.ID)) } -// NewTestSingleConfig is used to create a etcd config for the unit test purpose. -func NewTestSingleConfig(t *testing.T) *embed.Config { - cfg := embed.NewConfig() - cfg.Name = "test_etcd" - cfg.Dir = t.TempDir() - cfg.WalDir = "" - cfg.Logger = "zap" - cfg.LogOutputs = []string{"stdout"} - - pu, _ := url.Parse(tempurl.Alloc()) - cfg.LPUrls = []url.URL{*pu} - cfg.APUrls = cfg.LPUrls - cu, _ := url.Parse(tempurl.Alloc()) - cfg.LCUrls = []url.URL{*cu} - cfg.ACUrls = cfg.LCUrls - - cfg.StrictReconfigCheck = false - cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, &cfg.LPUrls[0]) - cfg.ClusterState = embed.ClusterStateFlagNew - return cfg -} - // CreateClients creates etcd v3 client and http client. func CreateClients(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, *http.Client, error) { - endpoints := []string{acUrls[0].String()} - lgc := zap.NewProductionConfig() - lgc.Encoding = log.ZapEncodingName - client, err := clientv3.New(clientv3.Config{ - Endpoints: endpoints, - DialTimeout: defaultEtcdClientTimeout, - TLS: tlsConfig, - LogConfig: &lgc, - }) + client, err := createEtcdClient(tlsConfig, acUrls) if err != nil { return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause() } - httpClient := &http.Client{ Transport: &http.Transport{ DisableKeepAlives: true, TLSClientConfig: tlsConfig, }, } - log.Info("create etcd v3 client", zap.Strings("endpoints", endpoints)) return client, httpClient, nil } +func createEtcdClient(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, error) { + if len(acUrls) == 0 { + return nil, errs.ErrNewEtcdClient.FastGenByArgs("no available etcd address") + } + endpoints := make([]string, 0, len(acUrls)) + for _, u := range acUrls { + endpoints = append(endpoints, u.String()) + } + lgc := zap.NewProductionConfig() + lgc.Encoding = log.ZapEncodingName + autoSyncInterval := defaultAutoSyncInterval + failpoint.Inject("autoSyncInterval", func() { + autoSyncInterval = 10 * time.Millisecond + }) + client, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + DialTimeout: defaultEtcdClientTimeout, + AutoSyncInterval: autoSyncInterval, + TLS: tlsConfig, + LogConfig: &lgc, + }) + if err != nil { + log.Info("create etcd v3 client", zap.Strings("endpoints", endpoints)) + } + return client, err +} + // InitClusterID creates a cluster ID for the given key if it hasn't existed. // This function assumes the cluster ID has already existed and always use a // cheaper read to retrieve it; if it doesn't exist, invoke the more expensive diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index e1d9177f7f8..79348e02eaf 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/stretchr/testify/require" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" @@ -78,7 +79,6 @@ func TestMemberHelpers(t *testing.T) { re.NoError(err) <-etcd2.Server.ReadyNotify() - re.NoError(err) listResp2, err := ListEtcdMembers(client2) re.NoError(err) @@ -232,3 +232,65 @@ func TestInitClusterID(t *testing.T) { re.NoError(err) re.Equal(clusterID, clusterID1) } + +func TestEtcdClientSync(t *testing.T) { + t.Parallel() + re := require.New(t) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval", "return(true)")) + + // Start a etcd server. + cfg1 := NewTestSingleConfig(t) + etcd1, err := embed.StartEtcd(cfg1) + re.NoError(err) + + // Create a etcd client with etcd1 as endpoint. + ep1 := cfg1.LCUrls[0].String() + urls, err := types.NewURLs([]string{ep1}) + re.NoError(err) + client1, err := createEtcdClient(nil, urls) + re.NoError(err) + <-etcd1.Server.ReadyNotify() + + // Add a new member. + cfg2 := NewTestSingleConfig(t) + cfg2.Name = "etcd2" + cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0]) + cfg2.ClusterState = embed.ClusterStateFlagExisting + peerURL := cfg2.LPUrls[0].String() + addResp, err := AddEtcdMember(client1, []string{peerURL}) + re.NoError(err) + etcd2, err := embed.StartEtcd(cfg2) + defer func() { + etcd2.Close() + }() + re.NoError(err) + re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID) + <-etcd2.Server.ReadyNotify() + + // Check the client can get the new member. + listResp2, err := ListEtcdMembers(client1) + re.NoError(err) + re.Len(listResp2.Members, 2) + for _, m := range listResp2.Members { + switch m.ID { + case uint64(etcd1.Server.ID()): + case uint64(etcd2.Server.ID()): + default: + t.Fatalf("unknown member: %v", m) + } + } + + // Remove the first member and close the etcd1. + _, err = RemoveEtcdMember(client1, uint64(etcd1.Server.ID())) + re.NoError(err) + time.Sleep(20 * time.Millisecond) // wait for etcd client sync endpoints and client will be connected to etcd2 + etcd1.Close() + + // Check the client can get the new member with the new endpoints. + listResp3, err := ListEtcdMembers(client1) + re.NoError(err) + re.Len(listResp3.Members, 1) + re.Equal(uint64(etcd2.Server.ID()), listResp3.Members[0].ID) + + require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval")) +} diff --git a/pkg/utils/etcdutil/testutil.go b/pkg/utils/etcdutil/testutil.go new file mode 100644 index 00000000000..a29a64881c9 --- /dev/null +++ b/pkg/utils/etcdutil/testutil.go @@ -0,0 +1,46 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdutil + +import ( + "fmt" + "net/url" + "testing" + + "github.com/tikv/pd/pkg/utils/tempurl" + "go.etcd.io/etcd/embed" +) + +// NewTestSingleConfig is used to create a etcd config for the unit test purpose. +func NewTestSingleConfig(t *testing.T) *embed.Config { + cfg := embed.NewConfig() + cfg.Name = "test_etcd" + cfg.Dir = t.TempDir() + cfg.WalDir = "" + cfg.Logger = "zap" + cfg.LogOutputs = []string{"stdout"} + + pu, _ := url.Parse(tempurl.Alloc()) + cfg.LPUrls = []url.URL{*pu} + cfg.APUrls = cfg.LPUrls + cu, _ := url.Parse(tempurl.Alloc()) + cfg.LCUrls = []url.URL{*cu} + cfg.ACUrls = cfg.LCUrls + + cfg.StrictReconfigCheck = false + cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, &cfg.LPUrls[0]) + cfg.ClusterState = embed.ClusterStateFlagNew + return cfg +} diff --git a/tests/client/client_test.go b/tests/client/client_test.go index ca1f5f9ac82..53e924a1d24 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -91,7 +91,7 @@ func TestClientClusterIDCheck(t *testing.T) { pd.SecurityOption{}, pd.WithMaxErrorRetry(1), ) re.Error(err) - re.Contains(err.Error(), "ErrClientGetLeader") + re.Contains(err.Error(), "ErrClientGetMember") re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipFirstUpdateMember")) re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipClusterIDCheck")) }