Skip to content

Commit

Permalink
client: supports to add gRPC dial options
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch committed Dec 18, 2019
1 parent 3583afd commit 360357e
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 6 deletions.
40 changes: 34 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ type client struct {
ctx context.Context
cancel context.CancelFunc

security SecurityOption
security SecurityOption
gRPCDialOptions []grpc.DialOption
}

// SecurityOption records options about tls
Expand All @@ -145,13 +146,23 @@ type SecurityOption struct {
KeyPath string
}

// ClientOption configures client.
type ClientOption func(c *client)

// WithGRPCDialOptions configures the client with gRPC dial options.
func WithGRPCDialOptions(opts ...grpc.DialOption) ClientOption {
return func(c *client) {
c.gRPCDialOptions = append(c.gRPCDialOptions, opts...)
}
}

// NewClient creates a PD client.
func NewClient(pdAddrs []string, security SecurityOption) (Client, error) {
return NewClientWithContext(context.Background(), pdAddrs, security)
func NewClient(pdAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {
return NewClientWithContext(context.Background(), pdAddrs, security, opts...)
}

// NewClientWithContext creates a PD client with context.
func NewClientWithContext(ctx context.Context, pdAddrs []string, security SecurityOption) (Client, error) {
func NewClientWithContext(ctx context.Context, pdAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {
log.Info("[pd] create pd client with endpoints", zap.Strings("pd-address", pdAddrs))
ctx1, cancel := context.WithCancel(ctx)
c := &client{
Expand All @@ -164,6 +175,9 @@ func NewClientWithContext(ctx context.Context, pdAddrs []string, security Securi
security: security,
}
c.connMu.clientConns = make(map[string]*grpc.ClientConn)
for _, opt := range opts {
opt(c)
}

if err := c.initRetry(c.initClusterID); err != nil {
cancel()
Expand All @@ -185,9 +199,23 @@ func NewClientWithContext(ctx context.Context, pdAddrs []string, security Securi

func (c *client) updateURLs(members []*pdpb.Member) {
urls := make([]string, 0, len(members))
urlsMap := make(map[string]struct{})
for _, m := range members {
urls = append(urls, m.GetClientUrls()...)
}
for _, url := range urls {
urlsMap[url] = struct{}{}
}
needUpdate := false
for _, url := range c.urls {
if _, ok := urlsMap[url]; !ok {
needUpdate = true
}
}
if !needUpdate {
return
}
log.Info("[pd] update member urls", zap.Strings("old-urls", c.urls), zap.Strings("new-urls", urls))
c.urls = urls
}

Expand Down Expand Up @@ -228,7 +256,7 @@ func (c *client) updateLeader() error {
ctx, cancel := context.WithTimeout(c.ctx, updateLeaderTimeout)
members, err := c.getMembers(ctx, u)
if err != nil {
log.Warn("cannot update leader", zap.String("address", u), zap.Error(err))
log.Warn("[pd] cannot update leader", zap.String("address", u), zap.Error(err))
}
cancel()
if err != nil || members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0 {
Expand Down Expand Up @@ -289,7 +317,7 @@ func (c *client) getOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) {
return conn, nil
}

cc, err := grpcutil.GetClientConn(addr, c.security.CAPath, c.security.CertPath, c.security.KeyPath)
cc, err := grpcutil.GetClientConn(addr, c.security.CAPath, c.security.CertPath, c.security.KeyPath, c.gRPCDialOptions...)
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
14 changes: 14 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/pd/server"
"github.com/pingcap/pd/server/core"
"go.uber.org/goleak"
"google.golang.org/grpc"
)

func TestClient(t *testing.T) {
Expand Down Expand Up @@ -490,3 +491,16 @@ func (s *testClientCtxSuite) TestClientCtx(c *C) {
c.Assert(err, NotNil)
c.Assert(time.Since(start), Less, time.Second*4)
}

var _ = Suite(&testClientDialOptionSuite{})

type testClientDialOptionSuite struct{}

func (s *testClientDialOptionSuite) TestGRPCDialOption(c *C) {
start := time.Now()
ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond)
defer cancel()
_, err := NewClientWithContext(ctx, []string{"localhost:8080"}, SecurityOption{}, WithGRPCDialOptions(grpc.WithBlock(), grpc.WithTimeout(time.Second)))
c.Assert(err, NotNil)
c.Assert(time.Since(start), Greater, 800*time.Millisecond)
}

0 comments on commit 360357e

Please sign in to comment.