Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpclb: enter fallback if no balancer addresses are available #3119

Merged
merged 8 commits into from
Oct 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions balancer/grpclb/grpclb.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ type lbBalancer struct {
// send to remote LB ClientConn through this resolver.
manualResolver *lbManualResolver
// The ClientConn to talk to the remote balancer.
ccRemoteLB *grpc.ClientConn
ccRemoteLB *remoteBalancerCCWrapper
// backoff for calling remote balancer.
backoff backoff.Strategy

Expand Down Expand Up @@ -426,6 +426,8 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error

addrs := ccs.ResolverState.Addresses
if len(addrs) == 0 {
// There should be at least one address, either grpclb server or
// fallback. Empty address is not valid.
return balancer.ErrBadResolverState
}

Expand All @@ -439,28 +441,33 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
}
}

if lb.ccRemoteLB == nil {
if len(remoteBalancerAddrs) == 0 {
grpclog.Errorf("grpclb: no remote balancer address is available, should never happen")
return balancer.ErrBadResolverState
if len(remoteBalancerAddrs) == 0 {
if lb.ccRemoteLB != nil {
lb.ccRemoteLB.close()
lb.ccRemoteLB = nil
}
} else if lb.ccRemoteLB == nil {
// First time receiving resolved addresses, create a cc to remote
// balancers.
lb.dialRemoteLB(remoteBalancerAddrs[0].ServerName)
lb.newRemoteBalancerCCWrapper()
// Start the fallback goroutine.
go lb.fallbackToBackendsAfter(lb.fallbackTimeout)
}

// cc to remote balancers uses lb.manualResolver. Send the updated remote
// balancer addresses to it through manualResolver.
lb.manualResolver.UpdateState(resolver.State{Addresses: remoteBalancerAddrs})
if lb.ccRemoteLB != nil {
// cc to remote balancers uses lb.manualResolver. Send the updated remote
// balancer addresses to it through manualResolver.
lb.manualResolver.UpdateState(resolver.State{Addresses: remoteBalancerAddrs})
}

lb.mu.Lock()
lb.resolvedBackendAddrs = backendAddrs
if lb.inFallback {
// This means we received a new list of resolved backends, and we are
// still in fallback mode. Need to update the list of backends we are
// using to the new list of backends.
if len(remoteBalancerAddrs) == 0 || lb.inFallback {
// If there's no remote balancer address in ClientConn update, grpclb
// enters fallback mode immediately.
//
// If a new update is received while grpclb is in fallback, update the
// list of backends being used to the new fallback backends.
lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
}
lb.mu.Unlock()
Expand All @@ -475,7 +482,7 @@ func (lb *lbBalancer) Close() {
}
close(lb.doneCh)
if lb.ccRemoteLB != nil {
lb.ccRemoteLB.Close()
lb.ccRemoteLB.close()
}
lb.cc.close()
}
168 changes: 99 additions & 69 deletions balancer/grpclb/grpclb_remote_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"io"
"net"
"sync"
"time"

"github.com/golang/protobuf/proto"
Expand All @@ -34,6 +35,7 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -107,6 +109,12 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback

fallbackModeChanged := lb.inFallback != fallback
lb.inFallback = fallback
if fallbackModeChanged && lb.inFallback {
// Clear previous received list when entering fallback, so if the server
// comes back and sends the same list again, the new addresses will be
// used.
lb.fullServerList = nil
}

balancingPolicyChanged := lb.usePickFirst != pickFirst
oldUsePickFirst := lb.usePickFirst
Expand Down Expand Up @@ -196,7 +204,71 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
lb.updateStateAndPicker(true, true)
}

func (lb *lbBalancer) readServerList(s *balanceLoadClientStream) error {
type remoteBalancerCCWrapper struct {
cc *grpc.ClientConn
lb *lbBalancer
backoff backoff.Strategy
done chan struct{}

// waitgroup to wait for all goroutines to exit.
wg sync.WaitGroup
}

func (lb *lbBalancer) newRemoteBalancerCCWrapper() {
var dopts []grpc.DialOption
if creds := lb.opt.DialCreds; creds != nil {
dopts = append(dopts, grpc.WithTransportCredentials(creds))
} else if bundle := lb.grpclbClientConnCreds; bundle != nil {
dopts = append(dopts, grpc.WithCredentialsBundle(bundle))
} else {
dopts = append(dopts, grpc.WithInsecure())
}
if lb.opt.Dialer != nil {
dopts = append(dopts, grpc.WithContextDialer(lb.opt.Dialer))
}
// Explicitly set pickfirst as the balancer.
dopts = append(dopts, grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"pick_first"}`))
wrb := internal.WithResolverBuilder.(func(resolver.Builder) grpc.DialOption)
dopts = append(dopts, wrb(lb.manualResolver))
if channelz.IsOn() {
dopts = append(dopts, grpc.WithChannelzParentID(lb.opt.ChannelzParentID))
}

// Enable Keepalive for grpclb client.
dopts = append(dopts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 20 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: true,
}))

// The dial target is not important.
//
// The grpclb server addresses will set field ServerName, and creds will
// receive ServerName as authority.
cc, err := grpc.DialContext(context.Background(), "grpclb.subClientConn", dopts...)
if err != nil {
grpclog.Fatalf("failed to dial: %v", err)
}
ccw := &remoteBalancerCCWrapper{
cc: cc,
lb: lb,
backoff: lb.backoff,
done: make(chan struct{}),
}
lb.ccRemoteLB = ccw
ccw.wg.Add(1)
go ccw.watchRemoteBalancer()
}

// close closed the ClientConn to remote balancer, and waits until all
// goroutines to finish.
func (ccw *remoteBalancerCCWrapper) close() {
close(ccw.done)
ccw.cc.Close()
ccw.wg.Wait()
}

func (ccw *remoteBalancerCCWrapper) readServerList(s *balanceLoadClientStream) error {
for {
reply, err := s.Recv()
if err != nil {
Expand All @@ -206,12 +278,12 @@ func (lb *lbBalancer) readServerList(s *balanceLoadClientStream) error {
return fmt.Errorf("grpclb: failed to recv server list: %v", err)
}
if serverList := reply.GetServerList(); serverList != nil {
lb.processServerList(serverList)
ccw.lb.processServerList(serverList)
}
}
}

func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration) {
func (ccw *remoteBalancerCCWrapper) sendLoadReport(s *balanceLoadClientStream, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
Expand All @@ -220,7 +292,7 @@ func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.D
case <-s.Context().Done():
return
}
stats := lb.clientStats.toClientStats()
stats := ccw.lb.clientStats.toClientStats()
t := time.Now()
stats.Timestamp = &timestamppb.Timestamp{
Seconds: t.Unix(),
Expand All @@ -236,23 +308,23 @@ func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.D
}
}

func (lb *lbBalancer) callRemoteBalancer() (backoff bool, _ error) {
lbClient := &loadBalancerClient{cc: lb.ccRemoteLB}
func (ccw *remoteBalancerCCWrapper) callRemoteBalancer() (backoff bool, _ error) {
lbClient := &loadBalancerClient{cc: ccw.cc}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := lbClient.BalanceLoad(ctx, grpc.WaitForReady(true))
if err != nil {
return true, fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
}
lb.mu.Lock()
lb.remoteBalancerConnected = true
lb.mu.Unlock()
ccw.lb.mu.Lock()
ccw.lb.remoteBalancerConnected = true
ccw.lb.mu.Unlock()

// grpclb handshake on the stream.
initReq := &lbpb.LoadBalanceRequest{
LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
InitialRequest: &lbpb.InitialLoadBalanceRequest{
Name: lb.target,
Name: ccw.lb.target,
},
},
}
Expand All @@ -271,21 +343,24 @@ func (lb *lbBalancer) callRemoteBalancer() (backoff bool, _ error) {
return true, fmt.Errorf("grpclb: Delegation is not supported")
}

ccw.wg.Add(1)
go func() {
defer ccw.wg.Done()
if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
lb.sendLoadReport(stream, d)
ccw.sendLoadReport(stream, d)
}
}()
// No backoff if init req/resp handshake was successful.
return false, lb.readServerList(stream)
return false, ccw.readServerList(stream)
}

func (lb *lbBalancer) watchRemoteBalancer() {
func (ccw *remoteBalancerCCWrapper) watchRemoteBalancer() {
defer ccw.wg.Done()
var retryCount int
for {
doBackoff, err := lb.callRemoteBalancer()
doBackoff, err := ccw.callRemoteBalancer()
select {
case <-lb.doneCh:
case <-ccw.done:
return
default:
if err != nil {
Expand All @@ -297,76 +372,31 @@ func (lb *lbBalancer) watchRemoteBalancer() {
}
}
// Trigger a re-resolve when the stream errors.
lb.cc.cc.ResolveNow(resolver.ResolveNowOption{})
ccw.lb.cc.cc.ResolveNow(resolver.ResolveNowOption{})

lb.mu.Lock()
lb.remoteBalancerConnected = false
lb.fullServerList = nil
ccw.lb.mu.Lock()
ccw.lb.remoteBalancerConnected = false
ccw.lb.fullServerList = nil
// Enter fallback when connection to remote balancer is lost, and the
// aggregated state is not Ready.
if !lb.inFallback && lb.state != connectivity.Ready {
if !ccw.lb.inFallback && ccw.lb.state != connectivity.Ready {
// Entering fallback.
lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
ccw.lb.refreshSubConns(ccw.lb.resolvedBackendAddrs, true, ccw.lb.usePickFirst)
}
lb.mu.Unlock()
ccw.lb.mu.Unlock()

if !doBackoff {
retryCount = 0
continue
}

timer := time.NewTimer(lb.backoff.Backoff(retryCount))
timer := time.NewTimer(ccw.backoff.Backoff(retryCount)) // Copy backoff
select {
case <-timer.C:
case <-lb.doneCh:
case <-ccw.done:
timer.Stop()
return
}
retryCount++
}
}

func (lb *lbBalancer) dialRemoteLB(remoteLBName string) {
var dopts []grpc.DialOption
if creds := lb.opt.DialCreds; creds != nil {
if err := creds.OverrideServerName(remoteLBName); err == nil {
dopts = append(dopts, grpc.WithTransportCredentials(creds))
} else {
grpclog.Warningf("grpclb: failed to override the server name in the credentials: %v, using Insecure", err)
dopts = append(dopts, grpc.WithInsecure())
}
} else if bundle := lb.grpclbClientConnCreds; bundle != nil {
dopts = append(dopts, grpc.WithCredentialsBundle(bundle))
} else {
dopts = append(dopts, grpc.WithInsecure())
}
if lb.opt.Dialer != nil {
dopts = append(dopts, grpc.WithContextDialer(lb.opt.Dialer))
}
// Explicitly set pickfirst as the balancer.
dopts = append(dopts, grpc.WithBalancerName(grpc.PickFirstBalancerName))
wrb := internal.WithResolverBuilder.(func(resolver.Builder) grpc.DialOption)
dopts = append(dopts, wrb(lb.manualResolver))
if channelz.IsOn() {
dopts = append(dopts, grpc.WithChannelzParentID(lb.opt.ChannelzParentID))
}

// Enable Keepalive for grpclb client.
dopts = append(dopts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 20 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: true,
}))

// DialContext using manualResolver.Scheme, which is a random scheme
// generated when init grpclb. The target scheme here is not important.
//
// The grpc dial target will be used by the creds (ALTS) as the authority,
// so it has to be set to remoteLBName that comes from resolver.
cc, err := grpc.DialContext(context.Background(), remoteLBName, dopts...)
if err != nil {
grpclog.Fatalf("failed to dial: %v", err)
}
lb.ccRemoteLB = cc
go lb.watchRemoteBalancer()
}
Loading