From 6ac15df67e7113ab5c86239eea175a6c1b596346 Mon Sep 17 00:00:00 2001 From: ShuNing Date: Wed, 18 Dec 2019 21:45:47 -0800 Subject: [PATCH] client: supports to add gRPC dial options (#2035) Signed-off-by: nolouch --- client/client.go | 36 ++++++++++++++++++++++++++++++------ client/client_test.go | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 6 deletions(-) diff --git a/client/client.go b/client/client.go index 8787b87512c..383c8b366be 100644 --- a/client/client.go +++ b/client/client.go @@ -15,6 +15,8 @@ package pd import ( "context" + "reflect" + "sort" "strings" "sync" "time" @@ -132,7 +134,8 @@ type client struct { ctx context.Context cancel context.CancelFunc - security SecurityOption + security SecurityOption + gRPCDialOptions []grpc.DialOption } // SecurityOption records options about tls @@ -142,13 +145,23 @@ type SecurityOption struct { KeyPath string } +// ClientOption configures client. +type ClientOption func(c *client) + +// WithGRPCDialOptions configures the client with gRPC dial options. +func WithGRPCDialOptions(opts ...grpc.DialOption) ClientOption { + return func(c *client) { + c.gRPCDialOptions = append(c.gRPCDialOptions, opts...) + } +} + // NewClient creates a PD client. -func NewClient(pdAddrs []string, security SecurityOption) (Client, error) { - return NewClientWithContext(context.Background(), pdAddrs, security) +func NewClient(pdAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) { + return NewClientWithContext(context.Background(), pdAddrs, security, opts...) } // NewClientWithContext creates a PD client with context. -func NewClientWithContext(ctx context.Context, pdAddrs []string, security SecurityOption) (Client, error) { +func NewClientWithContext(ctx context.Context, pdAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) { log.Info("[pd] create pd client with endpoints", zap.Strings("pd-address", pdAddrs)) ctx1, cancel := context.WithCancel(ctx) c := &client{ @@ -161,6 +174,9 @@ func NewClientWithContext(ctx context.Context, pdAddrs []string, security Securi security: security, } c.connMu.clientConns = make(map[string]*grpc.ClientConn) + for _, opt := range opts { + opt(c) + } if err := c.initRetry(c.initClusterID); err != nil { cancel() @@ -185,6 +201,14 @@ func (c *client) updateURLs(members []*pdpb.Member) { for _, m := range members { urls = append(urls, m.GetClientUrls()...) } + + sort.Strings(urls) + // the url list is same. + if reflect.DeepEqual(c.urls, urls) { + return + } + + log.Info("[pd] update member urls", zap.Strings("old-urls", c.urls), zap.Strings("new-urls", urls)) c.urls = urls } @@ -225,7 +249,7 @@ func (c *client) updateLeader() error { ctx, cancel := context.WithTimeout(c.ctx, updateLeaderTimeout) members, err := c.getMembers(ctx, u) if err != nil { - log.Warn("cannot update leader", zap.String("address", u), zap.Error(err)) + log.Warn("[pd] cannot update leader", zap.String("address", u), zap.Error(err)) } cancel() if err != nil || members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0 { @@ -286,7 +310,7 @@ func (c *client) getOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) { return conn, nil } - cc, err := grpcutil.GetClientConn(addr, c.security.CAPath, c.security.CertPath, c.security.KeyPath) + cc, err := grpcutil.GetClientConn(addr, c.security.CAPath, c.security.CertPath, c.security.KeyPath, c.gRPCDialOptions...) if err != nil { return nil, errors.WithStack(err) } diff --git a/client/client_test.go b/client/client_test.go index bec87669b51..001f4c4fe76 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -471,6 +471,28 @@ func (s *testClientSuite) TestScatterRegion(c *C) { c.Succeed() } +func (s *testClientSuite) TestUpdateURLs(c *C) { + members := []*pdpb.Member{ + {Name: "pd4", ClientUrls: []string{"tmp//pd4"}}, + {Name: "pd1", ClientUrls: []string{"tmp//pd1"}}, + {Name: "pd3", ClientUrls: []string{"tmp//pd3"}}, + {Name: "pd2", ClientUrls: []string{"tmp//pd2"}}, + } + getURLs := func(ms []*pdpb.Member) (urls []string) { + for _, m := range ms { + urls = append(urls, m.GetClientUrls()[0]) + } + return + } + cli := &client{} + cli.updateURLs(members[1:]) + c.Assert(cli.urls, DeepEquals, getURLs([]*pdpb.Member{members[1], members[3], members[2]})) + cli.updateURLs(members[1:]) + c.Assert(cli.urls, DeepEquals, getURLs([]*pdpb.Member{members[1], members[3], members[2]})) + cli.updateURLs(members) + c.Assert(cli.urls, DeepEquals, getURLs([]*pdpb.Member{members[1], members[3], members[2], members[0]})) +} + var _ = Suite(&testClientCtxSuite{}) type testClientCtxSuite struct{} @@ -483,3 +505,17 @@ func (s *testClientCtxSuite) TestClientCtx(c *C) { c.Assert(err, NotNil) c.Assert(time.Since(start), Less, time.Second*4) } + +var _ = Suite(&testClientDialOptionSuite{}) + +type testClientDialOptionSuite struct{} + +func (s *testClientDialOptionSuite) TestGRPCDialOption(c *C) { + start := time.Now() + ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond) + defer cancel() + // nolint + _, err := NewClientWithContext(ctx, []string{"localhost:8080"}, SecurityOption{}, WithGRPCDialOptions(grpc.WithBlock(), grpc.WithTimeout(time.Second))) + c.Assert(err, NotNil) + c.Assert(time.Since(start), Greater, 800*time.Millisecond) +}