Skip to content

Commit

Permalink
etcdutil: support multi backends client (#6046)
Browse files Browse the repository at this point in the history
close #6042

Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 authored Feb 24, 2023
1 parent 0469be5 commit 833e5fc
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 43 deletions.
6 changes: 3 additions & 3 deletions client/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (c *baseClient) memberLoop() {
failpoint.Continue()
})
if err := c.updateMember(); err != nil {
log.Error("[pd] failed updateMember", errs.ZapError(err))
log.Error("[pd] failed to update member", errs.ZapError(err))
}
}
}
Expand Down Expand Up @@ -303,7 +303,7 @@ func (c *baseClient) updateMember() error {
errTSO = c.switchTSOAllocatorLeader(members.GetTsoAllocatorLeaders())
}

// Failed to get PD leader
// Failed to get members
if err != nil {
log.Info("[pd] cannot update member from this address",
zap.String("address", u),
Expand All @@ -327,7 +327,7 @@ func (c *baseClient) updateMember() error {
// the error of `switchTSOAllocatorLeader` will be returned.
return errTSO
}
return errs.ErrClientGetLeader.FastGenByArgs(c.GetURLs())
return errs.ErrClientGetMember.FastGenByArgs(c.GetURLs())
}

func (c *baseClient) getMembers(ctx context.Context, url string, timeout time.Duration) (*pdpb.GetMembersResponse, error) {
Expand Down
1 change: 0 additions & 1 deletion pkg/mcs/resource_manager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
}

func (s *Server) initClient() error {
// TODO: We need to keep all backend endpoints and keep updating them to the latest. Once one of them failed, need to try another one.
tlsConfig, err := s.cfg.Security.ToTLSConfig()
if err != nil {
return err
Expand Down
69 changes: 32 additions & 37 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,18 @@ package etcdutil
import (
"context"
"crypto/tls"
"fmt"
"math/rand"
"net/http"
"net/url"
"testing"
"time"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/typeutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.etcd.io/etcd/etcdserver"
"go.etcd.io/etcd/pkg/types"
"go.uber.org/zap"
Expand All @@ -41,6 +38,9 @@ const (
// defaultEtcdClientTimeout is the default timeout for etcd client.
defaultEtcdClientTimeout = 3 * time.Second

// defaultAutoSyncInterval is the interval to sync etcd cluster.
defaultAutoSyncInterval = 60 * time.Second

// DefaultDialTimeout is the maximum amount of time a dial will wait for a
// connection to setup. 30s is long enough for most of the network conditions.
DefaultDialTimeout = 30 * time.Second
Expand Down Expand Up @@ -186,53 +186,48 @@ func EtcdKVPutWithTTL(ctx context.Context, c *clientv3.Client, key string, value
return kv.Put(ctx, key, value, clientv3.WithLease(grantResp.ID))
}

// NewTestSingleConfig is used to create a etcd config for the unit test purpose.
func NewTestSingleConfig(t *testing.T) *embed.Config {
cfg := embed.NewConfig()
cfg.Name = "test_etcd"
cfg.Dir = t.TempDir()
cfg.WalDir = ""
cfg.Logger = "zap"
cfg.LogOutputs = []string{"stdout"}

pu, _ := url.Parse(tempurl.Alloc())
cfg.LPUrls = []url.URL{*pu}
cfg.APUrls = cfg.LPUrls
cu, _ := url.Parse(tempurl.Alloc())
cfg.LCUrls = []url.URL{*cu}
cfg.ACUrls = cfg.LCUrls

cfg.StrictReconfigCheck = false
cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, &cfg.LPUrls[0])
cfg.ClusterState = embed.ClusterStateFlagNew
return cfg
}

// CreateClients creates etcd v3 client and http client.
func CreateClients(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, *http.Client, error) {
endpoints := []string{acUrls[0].String()}
lgc := zap.NewProductionConfig()
lgc.Encoding = log.ZapEncodingName
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: defaultEtcdClientTimeout,
TLS: tlsConfig,
LogConfig: &lgc,
})
client, err := createEtcdClient(tlsConfig, acUrls)
if err != nil {
return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause()
}

httpClient := &http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
TLSClientConfig: tlsConfig,
},
}
log.Info("create etcd v3 client", zap.Strings("endpoints", endpoints))
return client, httpClient, nil
}

func createEtcdClient(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, error) {
if len(acUrls) == 0 {
return nil, errs.ErrNewEtcdClient.FastGenByArgs("no available etcd address")
}
endpoints := make([]string, 0, len(acUrls))
for _, u := range acUrls {
endpoints = append(endpoints, u.String())
}
lgc := zap.NewProductionConfig()
lgc.Encoding = log.ZapEncodingName
autoSyncInterval := defaultAutoSyncInterval
failpoint.Inject("autoSyncInterval", func() {
autoSyncInterval = 10 * time.Millisecond
})
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: defaultEtcdClientTimeout,
AutoSyncInterval: autoSyncInterval,
TLS: tlsConfig,
LogConfig: &lgc,
})
if err != nil {
log.Info("create etcd v3 client", zap.Strings("endpoints", endpoints))
}
return client, err
}

// InitClusterID creates a cluster ID for the given key if it hasn't existed.
// This function assumes the cluster ID has already existed and always use a
// cheaper read to retrieve it; if it doesn't exist, invoke the more expensive
Expand Down
64 changes: 63 additions & 1 deletion pkg/utils/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
Expand Down Expand Up @@ -78,7 +79,6 @@ func TestMemberHelpers(t *testing.T) {
re.NoError(err)

<-etcd2.Server.ReadyNotify()
re.NoError(err)

listResp2, err := ListEtcdMembers(client2)
re.NoError(err)
Expand Down Expand Up @@ -232,3 +232,65 @@ func TestInitClusterID(t *testing.T) {
re.NoError(err)
re.Equal(clusterID, clusterID1)
}

func TestEtcdClientSync(t *testing.T) {
t.Parallel()
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval", "return(true)"))

// Start a etcd server.
cfg1 := NewTestSingleConfig(t)
etcd1, err := embed.StartEtcd(cfg1)
re.NoError(err)

// Create a etcd client with etcd1 as endpoint.
ep1 := cfg1.LCUrls[0].String()
urls, err := types.NewURLs([]string{ep1})
re.NoError(err)
client1, err := createEtcdClient(nil, urls)
re.NoError(err)
<-etcd1.Server.ReadyNotify()

// Add a new member.
cfg2 := NewTestSingleConfig(t)
cfg2.Name = "etcd2"
cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0])
cfg2.ClusterState = embed.ClusterStateFlagExisting
peerURL := cfg2.LPUrls[0].String()
addResp, err := AddEtcdMember(client1, []string{peerURL})
re.NoError(err)
etcd2, err := embed.StartEtcd(cfg2)
defer func() {
etcd2.Close()
}()
re.NoError(err)
re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID)
<-etcd2.Server.ReadyNotify()

// Check the client can get the new member.
listResp2, err := ListEtcdMembers(client1)
re.NoError(err)
re.Len(listResp2.Members, 2)
for _, m := range listResp2.Members {
switch m.ID {
case uint64(etcd1.Server.ID()):
case uint64(etcd2.Server.ID()):
default:
t.Fatalf("unknown member: %v", m)
}
}

// Remove the first member and close the etcd1.
_, err = RemoveEtcdMember(client1, uint64(etcd1.Server.ID()))
re.NoError(err)
time.Sleep(20 * time.Millisecond) // wait for etcd client sync endpoints and client will be connected to etcd2
etcd1.Close()

// Check the client can get the new member with the new endpoints.
listResp3, err := ListEtcdMembers(client1)
re.NoError(err)
re.Len(listResp3.Members, 1)
re.Equal(uint64(etcd2.Server.ID()), listResp3.Members[0].ID)

require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval"))
}
46 changes: 46 additions & 0 deletions pkg/utils/etcdutil/testutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcdutil

import (
"fmt"
"net/url"
"testing"

"github.com/tikv/pd/pkg/utils/tempurl"
"go.etcd.io/etcd/embed"
)

// NewTestSingleConfig is used to create a etcd config for the unit test purpose.
func NewTestSingleConfig(t *testing.T) *embed.Config {
cfg := embed.NewConfig()
cfg.Name = "test_etcd"
cfg.Dir = t.TempDir()
cfg.WalDir = ""
cfg.Logger = "zap"
cfg.LogOutputs = []string{"stdout"}

pu, _ := url.Parse(tempurl.Alloc())
cfg.LPUrls = []url.URL{*pu}
cfg.APUrls = cfg.LPUrls
cu, _ := url.Parse(tempurl.Alloc())
cfg.LCUrls = []url.URL{*cu}
cfg.ACUrls = cfg.LCUrls

cfg.StrictReconfigCheck = false
cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, &cfg.LPUrls[0])
cfg.ClusterState = embed.ClusterStateFlagNew
return cfg
}
2 changes: 1 addition & 1 deletion tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestClientClusterIDCheck(t *testing.T) {
pd.SecurityOption{}, pd.WithMaxErrorRetry(1),
)
re.Error(err)
re.Contains(err.Error(), "ErrClientGetLeader")
re.Contains(err.Error(), "ErrClientGetMember")
re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipFirstUpdateMember"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipClusterIDCheck"))
}
Expand Down

0 comments on commit 833e5fc

Please sign in to comment.