Skip to content

Commit

Permalink
Merge pull request #11215 from jpbetz/automated-cherry-pick-of-#11184…
Browse files Browse the repository at this point in the history
…-origin-release-3.3

Automated cherry pick of #11184
  • Loading branch information
jpbetz authored Oct 9, 2019
2 parents 7558b41 + a2f585d commit ef61a56
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 37 deletions.
2 changes: 1 addition & 1 deletion .words
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ too_many_pings
uncontended
unprefixed
unlisting

WithDialer
18 changes: 18 additions & 0 deletions clientv3/balancer/resolver/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
package endpoint

import (
"context"
"fmt"
"net"
"net/url"
"strings"
"sync"
Expand Down Expand Up @@ -227,3 +229,19 @@ func ParseTarget(target string) (string, string, error) {
}
return parts[0], parts[1], nil
}

// Dialer dials a endpoint using net.Dialer.
// Context cancelation and timeout are supported.
func Dialer(ctx context.Context, dialEp string) (net.Conn, error) {
proto, host, _ := ParseEndpoint(dialEp)
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
dialer := &net.Dialer{}
if deadline, ok := ctx.Deadline(); ok {
dialer.Deadline = deadline
}
return dialer.DialContext(ctx, proto, host)
}
25 changes: 12 additions & 13 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,24 +226,17 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts
}
opts = append(opts, dopts...)

// Provide a net dialer that supports cancelation and timeout.
f := func(dialEp string, t time.Duration) (net.Conn, error) {
proto, host, _ := endpoint.ParseEndpoint(dialEp)
select {
case <-c.ctx.Done():
return nil, c.ctx.Err()
default:
}
dialer := &net.Dialer{Timeout: t}
return dialer.DialContext(c.ctx, proto, host)
}
opts = append(opts, grpc.WithDialer(f))

dialer := endpoint.Dialer
if creds != nil {
opts = append(opts, grpc.WithTransportCredentials(creds))
// gRPC load balancer workaround. See credentials.transportCredential for details.
if credsDialer, ok := creds.(TransportCredentialsWithDialer); ok {
dialer = credsDialer.Dialer
}
} else {
opts = append(opts, grpc.WithInsecure())
}
opts = append(opts, grpc.WithContextDialer(dialer))

// Interceptor retry and backoff.
// TODO: Replace all of clientv3/retry.go with interceptor based retry, or with
Expand Down Expand Up @@ -667,3 +660,9 @@ func IsConnCanceled(err error) bool {
// <= gRPC v1.7.x returns 'errors.New("grpc: the client connection is closing")'
return strings.Contains(err.Error(), "grpc: the client connection is closing")
}

// TransportCredentialsWithDialer is for a gRPC load balancer workaround. See credentials.transportCredential for details.
type TransportCredentialsWithDialer interface {
grpccredentials.TransportCredentials
Dialer(ctx context.Context, dialEp string) (net.Conn, error)
}
64 changes: 41 additions & 23 deletions clientv3/credentials/credentials.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net"
"sync"

"github.com/coreos/etcd/clientv3/balancer/resolver/endpoint"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
grpccredentials "google.golang.org/grpc/credentials"
)
Expand Down Expand Up @@ -65,38 +66,37 @@ func (b *bundle) NewWithMode(mode string) (grpccredentials.Bundle, error) {
}

// transportCredential implements "grpccredentials.TransportCredentials" interface.
// transportCredential wraps TransportCredentials to track which
// addresses are dialed for which endpoints, and then sets the authority when checking the endpoint's cert to the
// hostname or IP of the dialed endpoint.
// This is a workaround of a gRPC load balancer issue. gRPC uses the dialed target's service name as the authority when
// checking all endpoint certs, which does not work for etcd servers using their hostname or IP as the Subject Alternative Name
// in their TLS certs.
// To enable, include both WithTransportCredentials(creds) and WithContextDialer(creds.Dialer)
// when dialing.
type transportCredential struct {
gtc grpccredentials.TransportCredentials
mu sync.Mutex
// addrToEndpoint maps from the connection addresses that are dialed to the hostname or IP of the
// endpoint provided to the dialer when dialing
addrToEndpoint map[string]string
}

func newTransportCredential(cfg *tls.Config) *transportCredential {
return &transportCredential{
gtc: grpccredentials.NewTLS(cfg),
gtc: grpccredentials.NewTLS(cfg),
addrToEndpoint: map[string]string{},
}
}

func (tc *transportCredential) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, grpccredentials.AuthInfo, error) {
// Only overwrite when authority is an IP address!
// Let's say, a server runs SRV records on "etcd.local" that resolves
// to "m1.etcd.local", and its SAN field also includes "m1.etcd.local".
// But what if SAN does not include its resolved IP address (e.g. 127.0.0.1)?
// Then, the server should only authenticate using its DNS hostname "m1.etcd.local",
// instead of overwriting it with its IP address.
// And we do not overwrite "localhost" either. Only overwrite IP addresses!
if isIP(authority) {
target := rawConn.RemoteAddr().String()
if authority != target {
// When user dials with "grpc.WithDialer", "grpc.DialContext" "cc.parsedTarget"
// update only happens once. This is problematic, because when TLS is enabled,
// retries happen through "grpc.WithDialer" with static "cc.parsedTarget" from
// the initial dial call.
// If the server authenticates by IP addresses, we want to set a new endpoint as
// a new authority. Otherwise
// "transport: authentication handshake failed: x509: certificate is valid for 127.0.0.1, 192.168.121.180, not 192.168.223.156"
// when the new dial target is "192.168.121.180" whose certificate host name is also "192.168.121.180"
// but client tries to authenticate with previously set "cc.parsedTarget" field "192.168.223.156"
authority = target
}
// Set the authority when checking the endpoint's cert to the hostname or IP of the dialed endpoint
tc.mu.Lock()
dialEp, ok := tc.addrToEndpoint[rawConn.RemoteAddr().String()]
tc.mu.Unlock()
if ok {
_, host, _ := endpoint.ParseEndpoint(dialEp)
authority = host
}
return tc.gtc.ClientHandshake(ctx, authority, rawConn)
}
Expand All @@ -115,15 +115,33 @@ func (tc *transportCredential) Info() grpccredentials.ProtocolInfo {
}

func (tc *transportCredential) Clone() grpccredentials.TransportCredentials {
copy := map[string]string{}
tc.mu.Lock()
for k, v := range tc.addrToEndpoint {
copy[k] = v
}
tc.mu.Unlock()
return &transportCredential{
gtc: tc.gtc.Clone(),
gtc: tc.gtc.Clone(),
addrToEndpoint: copy,
}
}

func (tc *transportCredential) OverrideServerName(serverNameOverride string) error {
return tc.gtc.OverrideServerName(serverNameOverride)
}

func (tc *transportCredential) Dialer(ctx context.Context, dialEp string) (net.Conn, error) {
// Keep track of which addresses are dialed for which endpoints
conn, err := endpoint.Dialer(ctx, dialEp)
if conn != nil {
tc.mu.Lock()
tc.addrToEndpoint[conn.RemoteAddr().String()] = dialEp
tc.mu.Unlock()
}
return conn, err
}

// perRPCCredential implements "grpccredentials.PerRPCCredentials" interface.
type perRPCCredential struct {
authToken string
Expand Down

0 comments on commit ef61a56

Please sign in to comment.