From 60deae398c088210dfe55d7af5e5a387b2211747 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 16 Oct 2019 16:17:23 -0700 Subject: [PATCH 1/8] grpclb: enter fallback if no balancer addresses are available --- balancer/grpclb/grpclb.go | 21 +-- balancer/grpclb/grpclb_remote_balancer.go | 11 +- balancer/grpclb/grpclb_test.go | 167 ++++++++++++++++------ 3 files changed, 136 insertions(+), 63 deletions(-) diff --git a/balancer/grpclb/grpclb.go b/balancer/grpclb/grpclb.go index b581978f8512..4ca5dd85562c 100644 --- a/balancer/grpclb/grpclb.go +++ b/balancer/grpclb/grpclb.go @@ -426,6 +426,8 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error addrs := ccs.ResolverState.Addresses if len(addrs) == 0 { + // There's should be at least one address, either grpclb server or + // fallback. Empty address is not valid. return balancer.ErrBadResolverState } @@ -440,27 +442,28 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error } if lb.ccRemoteLB == nil { - if len(remoteBalancerAddrs) == 0 { - grpclog.Errorf("grpclb: no remote balancer address is available, should never happen") - return balancer.ErrBadResolverState - } // First time receiving resolved addresses, create a cc to remote // balancers. - lb.dialRemoteLB(remoteBalancerAddrs[0].ServerName) + lb.dialRemoteLB() // Start the fallback goroutine. go lb.fallbackToBackendsAfter(lb.fallbackTimeout) } // cc to remote balancers uses lb.manualResolver. Send the updated remote // balancer addresses to it through manualResolver. + // + // If remoteBalancerAddrs is empty, this could trigger resolveNow() in the + // sub-ClientConn. lb.manualResolver.UpdateState(resolver.State{Addresses: remoteBalancerAddrs}) lb.mu.Lock() lb.resolvedBackendAddrs = backendAddrs - if lb.inFallback { - // This means we received a new list of resolved backends, and we are - // still in fallback mode. Need to update the list of backends we are - // using to the new list of backends. + if len(remoteBalancerAddrs) == 0 || lb.inFallback { + // If there's no remote balancer address in ClientConn update, grpclb + // enters fallback mode immediately. + // + // If a new update is received while grpclb is in fallback, update the + // list of backends being used to the new fallback backends. lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst) } lb.mu.Unlock() diff --git a/balancer/grpclb/grpclb_remote_balancer.go b/balancer/grpclb/grpclb_remote_balancer.go index bf9e523cd4bc..20e4ff879456 100644 --- a/balancer/grpclb/grpclb_remote_balancer.go +++ b/balancer/grpclb/grpclb_remote_balancer.go @@ -326,15 +326,10 @@ func (lb *lbBalancer) watchRemoteBalancer() { } } -func (lb *lbBalancer) dialRemoteLB(remoteLBName string) { +func (lb *lbBalancer) dialRemoteLB() { var dopts []grpc.DialOption if creds := lb.opt.DialCreds; creds != nil { - if err := creds.OverrideServerName(remoteLBName); err == nil { - dopts = append(dopts, grpc.WithTransportCredentials(creds)) - } else { - grpclog.Warningf("grpclb: failed to override the server name in the credentials: %v, using Insecure", err) - dopts = append(dopts, grpc.WithInsecure()) - } + dopts = append(dopts, grpc.WithTransportCredentials(creds)) } else if bundle := lb.grpclbClientConnCreds; bundle != nil { dopts = append(dopts, grpc.WithCredentialsBundle(bundle)) } else { @@ -363,7 +358,7 @@ func (lb *lbBalancer) dialRemoteLB(remoteLBName string) { // // The grpc dial target will be used by the creds (ALTS) as the authority, // so it has to be set to remoteLBName that comes from resolver. - cc, err := grpc.DialContext(context.Background(), remoteLBName, dopts...) + cc, err := grpc.DialContext(context.Background(), "grpclb.subClientConn", dopts...) if err != nil { grpclog.Fatalf("failed to dial: %v", err) } diff --git a/balancer/grpclb/grpclb_test.go b/balancer/grpclb/grpclb_test.go index 55f00706985f..1ea9b4bd28de 100644 --- a/balancer/grpclb/grpclb_test.go +++ b/balancer/grpclb/grpclb_test.go @@ -49,8 +49,8 @@ import ( ) var ( - lbServerName = "bar.com" - beServerName = "foo.com" + lbServerName = "lb.server.com" + beServerName = "backends.com" lbToken = "iamatoken" // Resolver replaces localhost with fakeName in Next(). @@ -60,9 +60,8 @@ var ( ) type serverNameCheckCreds struct { - mu sync.Mutex - sn string - expected string + mu sync.Mutex + sn string } func (c *serverNameCheckCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { @@ -72,10 +71,10 @@ func (c *serverNameCheckCreds) ServerHandshake(rawConn net.Conn) (net.Conn, cred } return rawConn, nil, nil } -func (c *serverNameCheckCreds) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { +func (c *serverNameCheckCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { c.mu.Lock() defer c.mu.Unlock() - b := make([]byte, len(c.expected)) + b := make([]byte, len(authority)) errCh := make(chan error, 1) go func() { _, err := rawConn.Read(b) @@ -84,34 +83,25 @@ func (c *serverNameCheckCreds) ClientHandshake(ctx context.Context, addr string, select { case err := <-errCh: if err != nil { - fmt.Printf("Failed to read the server name from the server %v", err) + fmt.Printf("test-creds: failed to read expected authority name from the server: %v\n", err) return nil, nil, err } case <-ctx.Done(): return nil, nil, ctx.Err() } - if c.expected != string(b) { - fmt.Printf("Read the server name %s want %s", string(b), c.expected) - return nil, nil, errors.New("received unexpected server name") + if authority != string(b) { + fmt.Printf("test-creds: got authority from ClientConn %q, expected by server %q\n", authority, string(b)) + return nil, nil, errors.New("received unexpected server nameq") } return rawConn, nil, nil } func (c *serverNameCheckCreds) Info() credentials.ProtocolInfo { - c.mu.Lock() - defer c.mu.Unlock() return credentials.ProtocolInfo{} } func (c *serverNameCheckCreds) Clone() credentials.TransportCredentials { - c.mu.Lock() - defer c.mu.Unlock() - return &serverNameCheckCreds{ - expected: c.expected, - } + return &serverNameCheckCreds{} } func (c *serverNameCheckCreds) OverrideServerName(s string) error { - c.mu.Lock() - defer c.mu.Unlock() - c.expected = s return nil } @@ -388,9 +378,7 @@ func TestGRPCLB(t *testing.T) { Servers: bes, } tss.ls.sls <- sl - creds := serverNameCheckCreds{ - expected: beServerName, - } + creds := serverNameCheckCreds{} ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, @@ -439,9 +427,7 @@ func TestGRPCLBWeighted(t *testing.T) { portsToIndex[tss.bePorts[i]] = i } - creds := serverNameCheckCreds{ - expected: beServerName, - } + creds := serverNameCheckCreds{} ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, @@ -509,9 +495,7 @@ func TestDropRequest(t *testing.T) { Drop: true, }}, } - creds := serverNameCheckCreds{ - expected: beServerName, - } + creds := serverNameCheckCreds{} ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, @@ -671,9 +655,7 @@ func TestBalancerDisconnects(t *testing.T) { lbs = append(lbs, tss.lb) } - creds := serverNameCheckCreds{ - expected: beServerName, - } + creds := serverNameCheckCreds{} ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, @@ -752,9 +734,7 @@ func TestFallback(t *testing.T) { Servers: bes, } tss.ls.sls <- sl - creds := serverNameCheckCreds{ - expected: beServerName, - } + creds := serverNameCheckCreds{} ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, @@ -770,9 +750,8 @@ func TestFallback(t *testing.T) { Type: resolver.GRPCLB, ServerName: lbServerName, }, { - Addr: beLis.Addr().String(), - Type: resolver.Backend, - ServerName: beServerName, + Addr: beLis.Addr().String(), + Type: resolver.Backend, }}}) var p peer.Peer @@ -788,9 +767,8 @@ func TestFallback(t *testing.T) { Type: resolver.GRPCLB, ServerName: lbServerName, }, { - Addr: beLis.Addr().String(), - Type: resolver.Backend, - ServerName: beServerName, + Addr: beLis.Addr().String(), + Type: resolver.Backend, }}}) var backendUsed bool @@ -851,6 +829,105 @@ func TestFallback(t *testing.T) { } } +func TestFallBackWithNoServerAddress(t *testing.T) { + defer leakcheck.Check(t) + + r, cleanup := manual.GenerateAndRegisterManualResolver() + defer cleanup() + + tss, cleanup, err := newLoadBalancer(1) + if err != nil { + t.Fatalf("failed to create new load balancer: %v", err) + } + defer cleanup() + + // Start a standalone backend. + beLis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Failed to listen %v", err) + } + defer beLis.Close() + standaloneBEs := startBackends(beServerName, true, beLis) + defer stopBackends(standaloneBEs) + + be := &lbpb.Server{ + IpAddress: tss.beIPs[0], + Port: int32(tss.bePorts[0]), + LoadBalanceToken: lbToken, + } + var bes []*lbpb.Server + bes = append(bes, be) + sl := &lbpb.ServerList{ + Servers: bes, + } + tss.ls.sls <- sl + creds := serverNameCheckCreds{} + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, + grpc.WithTransportCredentials(&creds), grpc.WithContextDialer(fakeNameDialer)) + if err != nil { + t.Fatalf("Failed to dial to the backend %v", err) + } + defer cc.Close() + testC := testpb.NewTestServiceClient(cc) + + // Select grpclb with service config. + const pfc = `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"round_robin":{}}]}}]}` + scpr := r.CC.ParseServiceConfig(pfc) + if scpr.Err != nil { + t.Fatalf("Error parsing config %q: %v", pfc, scpr.Err) + } + + // Send an update with only backend address. grpclb should enter fallback + // and use the fallback backend. + r.UpdateState(resolver.State{ + Addresses: []resolver.Address{{ + Addr: beLis.Addr().String(), + Type: resolver.Backend, + }}, + ServiceConfig: scpr, + }) + + var p peer.Peer + rpcCtx, rpcCancel := context.WithTimeout(context.Background(), time.Second) + defer rpcCancel() + if _, err := testC.EmptyCall(rpcCtx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, ", err) + } + if p.Addr.String() != beLis.Addr().String() { + t.Fatalf("got peer: %v, want peer: %v", p.Addr, beLis.Addr()) + } + + // Send an update with balancer address. The backends behind grpclb should + // be used. + r.UpdateState(resolver.State{ + Addresses: []resolver.Address{{ + Addr: tss.lbAddr, + Type: resolver.GRPCLB, + ServerName: lbServerName, + }, { + Addr: beLis.Addr().String(), + Type: resolver.Backend, + }}, + }) + + var backendUsed bool + for i := 0; i < 1000; i++ { + if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) + } + if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] { + backendUsed = true + break + } + time.Sleep(time.Millisecond) + } + if !backendUsed { + t.Fatalf("No RPC sent to backend behind remote balancer after 1 second") + } +} + func TestGRPCLBPickFirst(t *testing.T) { defer leakcheck.Check(t) @@ -881,9 +958,7 @@ func TestGRPCLBPickFirst(t *testing.T) { portsToIndex[tss.bePorts[i]] = i } - creds := serverNameCheckCreds{ - expected: beServerName, - } + creds := serverNameCheckCreds{} ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, @@ -1034,7 +1109,7 @@ func runAndGetStats(t *testing.T, drop bool, runRPCs func(*grpc.ClientConn)) *rp } tss.ls.sls <- &lbpb.ServerList{Servers: servers} tss.ls.statsDura = 100 * time.Millisecond - creds := serverNameCheckCreds{expected: beServerName} + creds := serverNameCheckCreds{} ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() From f0636988a08062f94521018e49d3cacc1cf7be66 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Mon, 21 Oct 2019 16:06:35 -0700 Subject: [PATCH 2/8] add tests, re-resolve failing --- balancer/grpclb/grpclb_remote_balancer.go | 7 +- balancer/grpclb/grpclb_test.go | 138 +++++++++++++++------- 2 files changed, 102 insertions(+), 43 deletions(-) diff --git a/balancer/grpclb/grpclb_remote_balancer.go b/balancer/grpclb/grpclb_remote_balancer.go index 20e4ff879456..f27993a15407 100644 --- a/balancer/grpclb/grpclb_remote_balancer.go +++ b/balancer/grpclb/grpclb_remote_balancer.go @@ -107,6 +107,11 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback fallbackModeChanged := lb.inFallback != fallback lb.inFallback = fallback + if fallbackModeChanged && lb.inFallback { + // Clear previous received list, so if the server is back and sends the + // same list again, the new addresses will be used. + lb.fullServerList = nil + } balancingPolicyChanged := lb.usePickFirst != pickFirst oldUsePickFirst := lb.usePickFirst @@ -339,7 +344,7 @@ func (lb *lbBalancer) dialRemoteLB() { dopts = append(dopts, grpc.WithContextDialer(lb.opt.Dialer)) } // Explicitly set pickfirst as the balancer. - dopts = append(dopts, grpc.WithBalancerName(grpc.PickFirstBalancerName)) + dopts = append(dopts, grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"pick_first"}`)) wrb := internal.WithResolverBuilder.(func(resolver.Builder) grpc.DialOption) dopts = append(dopts, wrb(lb.manualResolver)) if channelz.IsOn() { diff --git a/balancer/grpclb/grpclb_test.go b/balancer/grpclb/grpclb_test.go index 1ea9b4bd28de..5ea0cc7d6682 100644 --- a/balancer/grpclb/grpclb_test.go +++ b/balancer/grpclb/grpclb_test.go @@ -829,10 +829,55 @@ func TestFallback(t *testing.T) { } } +type pickfirstFailOnEmptyAddrsListBuilder struct { + balancer.Builder // pick_first builder. +} + +func (b *pickfirstFailOnEmptyAddrsListBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + pf := b.Builder.Build(cc, opts) + return &pickfirstFailOnEmptyAddrsList{pf} +} + +type pickfirstFailOnEmptyAddrsList struct { + balancer.Balancer // pick_first balancer. +} + +func (b *pickfirstFailOnEmptyAddrsList) UpdateClientConnState(s balancer.ClientConnState) error { + addrs := s.ResolverState.Addresses + if len(addrs) == 0 { + return balancer.ErrBadResolverState + } + b.Balancer.HandleResolvedAddrs(addrs, nil) + return nil +} + +func (b *pickfirstFailOnEmptyAddrsList) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { + b.Balancer.HandleSubConnStateChange(sc, state.ConnectivityState) +} + +func (b *pickfirstFailOnEmptyAddrsList) ResolverError(error) {} + func TestFallBackWithNoServerAddress(t *testing.T) { defer leakcheck.Check(t) + defer func() func() { + // Override pick_first with a balancer that returns error to trigger + // re-resolve, to test that when grpclb accepts no server address, + // re-resolve is never triggered. + pfb := balancer.Get("pick_first") + balancer.Register(&pickfirstFailOnEmptyAddrsListBuilder{pfb}) + return func() { balancer.Register(pfb) } + }()() + + resolveNowCh := make(chan struct{}, 1) r, cleanup := manual.GenerateAndRegisterManualResolver() + r.ResolveNowCallback = func(resolver.ResolveNowOption) { + select { + case <-resolveNowCh: + default: + } + resolveNowCh <- struct{}{} + } defer cleanup() tss, cleanup, err := newLoadBalancer(1) @@ -860,7 +905,6 @@ func TestFallBackWithNoServerAddress(t *testing.T) { sl := &lbpb.ServerList{ Servers: bes, } - tss.ls.sls <- sl creds := serverNameCheckCreds{} ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -879,52 +923,62 @@ func TestFallBackWithNoServerAddress(t *testing.T) { t.Fatalf("Error parsing config %q: %v", pfc, scpr.Err) } - // Send an update with only backend address. grpclb should enter fallback - // and use the fallback backend. - r.UpdateState(resolver.State{ - Addresses: []resolver.Address{{ - Addr: beLis.Addr().String(), - Type: resolver.Backend, - }}, - ServiceConfig: scpr, - }) + for i := 0; i < 2; i++ { + // Send an update with only backend address. grpclb should enter fallback + // and use the fallback backend. + r.UpdateState(resolver.State{ + Addresses: []resolver.Address{{ + Addr: beLis.Addr().String(), + Type: resolver.Backend, + }}, + ServiceConfig: scpr, + }) - var p peer.Peer - rpcCtx, rpcCancel := context.WithTimeout(context.Background(), time.Second) - defer rpcCancel() - if _, err := testC.EmptyCall(rpcCtx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { - t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, ", err) - } - if p.Addr.String() != beLis.Addr().String() { - t.Fatalf("got peer: %v, want peer: %v", p.Addr, beLis.Addr()) - } + select { + case <-resolveNowCh: + t.Fatalf("unexpected resolveNow when grpclb gets no balancer address") + case <-time.After(time.Second): + } - // Send an update with balancer address. The backends behind grpclb should - // be used. - r.UpdateState(resolver.State{ - Addresses: []resolver.Address{{ - Addr: tss.lbAddr, - Type: resolver.GRPCLB, - ServerName: lbServerName, - }, { - Addr: beLis.Addr().String(), - Type: resolver.Backend, - }}, - }) + var p peer.Peer + rpcCtx, rpcCancel := context.WithTimeout(context.Background(), time.Second) + defer rpcCancel() + if _, err := testC.EmptyCall(rpcCtx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, ", err) + } + if p.Addr.String() != beLis.Addr().String() { + t.Fatalf("got peer: %v, want peer: %v", p.Addr, beLis.Addr()) + } - var backendUsed bool - for i := 0; i < 1000; i++ { - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { - t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) + tss.ls.sls <- sl + // Send an update with balancer address. The backends behind grpclb should + // be used. + r.UpdateState(resolver.State{ + Addresses: []resolver.Address{{ + Addr: tss.lbAddr, + Type: resolver.GRPCLB, + ServerName: lbServerName, + }, { + Addr: beLis.Addr().String(), + Type: resolver.Backend, + }}, + ServiceConfig: scpr, + }) + + var backendUsed bool + for i := 0; i < 1000; i++ { + if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) + } + if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] { + backendUsed = true + break + } + time.Sleep(time.Millisecond) } - if p.Addr.(*net.TCPAddr).Port == tss.bePorts[0] { - backendUsed = true - break + if !backendUsed { + t.Fatalf("No RPC sent to backend behind remote balancer after 1 second") } - time.Sleep(time.Millisecond) - } - if !backendUsed { - t.Fatalf("No RPC sent to backend behind remote balancer after 1 second") } } From bd20ec1f9e380a6a8d6706fa430d997fcf9c40aa Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 22 Oct 2019 15:46:07 -0700 Subject: [PATCH 3/8] move ccremote to a separate struct --- balancer/grpclb/grpclb.go | 11 +- balancer/grpclb/grpclb_remote_balancer.go | 147 ++++++++++++---------- balancer/grpclb/grpclb_test.go | 10 +- 3 files changed, 96 insertions(+), 72 deletions(-) diff --git a/balancer/grpclb/grpclb.go b/balancer/grpclb/grpclb.go index 4ca5dd85562c..4aa5384d69a2 100644 --- a/balancer/grpclb/grpclb.go +++ b/balancer/grpclb/grpclb.go @@ -187,7 +187,7 @@ type lbBalancer struct { // send to remote LB ClientConn through this resolver. manualResolver *lbManualResolver // The ClientConn to talk to the remote balancer. - ccRemoteLB *grpc.ClientConn + ccRemoteLB *remoteBalancerCCWrapper // backoff for calling remote balancer. backoff backoff.Strategy @@ -441,10 +441,15 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error } } + // if len(remoteBalancerAddrs) == 0 && lb.ccRemoteLB != nil { + // lb.ccRemoteLB.close() + // lb.ccRemoteLB = nil + // } + if lb.ccRemoteLB == nil { // First time receiving resolved addresses, create a cc to remote // balancers. - lb.dialRemoteLB() + lb.newRemoteBalancerCCWrapper() // Start the fallback goroutine. go lb.fallbackToBackendsAfter(lb.fallbackTimeout) } @@ -478,7 +483,7 @@ func (lb *lbBalancer) Close() { } close(lb.doneCh) if lb.ccRemoteLB != nil { - lb.ccRemoteLB.Close() + lb.ccRemoteLB.close() } lb.cc.close() } diff --git a/balancer/grpclb/grpclb_remote_balancer.go b/balancer/grpclb/grpclb_remote_balancer.go index f27993a15407..b326ffdaa7b8 100644 --- a/balancer/grpclb/grpclb_remote_balancer.go +++ b/balancer/grpclb/grpclb_remote_balancer.go @@ -34,6 +34,7 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" @@ -201,7 +202,65 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback lb.updateStateAndPicker(true, true) } -func (lb *lbBalancer) readServerList(s *balanceLoadClientStream) error { +type remoteBalancerCCWrapper struct { + cc *grpc.ClientConn + lb *lbBalancer + backoff backoff.Strategy + done chan struct{} +} + +func (lb *lbBalancer) newRemoteBalancerCCWrapper() { + var dopts []grpc.DialOption + if creds := lb.opt.DialCreds; creds != nil { + dopts = append(dopts, grpc.WithTransportCredentials(creds)) + } else if bundle := lb.grpclbClientConnCreds; bundle != nil { + dopts = append(dopts, grpc.WithCredentialsBundle(bundle)) + } else { + dopts = append(dopts, grpc.WithInsecure()) + } + if lb.opt.Dialer != nil { + dopts = append(dopts, grpc.WithContextDialer(lb.opt.Dialer)) + } + // Explicitly set pickfirst as the balancer. + dopts = append(dopts, grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"pick_first"}`)) + wrb := internal.WithResolverBuilder.(func(resolver.Builder) grpc.DialOption) + dopts = append(dopts, wrb(lb.manualResolver)) + if channelz.IsOn() { + dopts = append(dopts, grpc.WithChannelzParentID(lb.opt.ChannelzParentID)) + } + + // Enable Keepalive for grpclb client. + dopts = append(dopts, grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 20 * time.Second, + Timeout: 10 * time.Second, + PermitWithoutStream: true, + })) + + // DialContext using manualResolver.Scheme, which is a random scheme + // generated when init grpclb. The target scheme here is not important. + // + // The grpc dial target will be used by the creds (ALTS) as the authority, + // so it has to be set to remoteLBName that comes from resolver. + cc, err := grpc.DialContext(context.Background(), "grpclb.subClientConn", dopts...) + if err != nil { + grpclog.Fatalf("failed to dial: %v", err) + } + ccw := &remoteBalancerCCWrapper{ + cc: cc, + lb: lb, + backoff: lb.backoff, + done: make(chan struct{}), + } + lb.ccRemoteLB = ccw + go ccw.watchRemoteBalancer() +} + +func (ccw *remoteBalancerCCWrapper) close() { + close(ccw.done) + ccw.cc.Close() +} + +func (ccw *remoteBalancerCCWrapper) readServerList(s *balanceLoadClientStream) error { for { reply, err := s.Recv() if err != nil { @@ -211,12 +270,12 @@ func (lb *lbBalancer) readServerList(s *balanceLoadClientStream) error { return fmt.Errorf("grpclb: failed to recv server list: %v", err) } if serverList := reply.GetServerList(); serverList != nil { - lb.processServerList(serverList) + ccw.lb.processServerList(serverList) } } } -func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration) { +func (ccw *remoteBalancerCCWrapper) sendLoadReport(s *balanceLoadClientStream, interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() for { @@ -225,7 +284,7 @@ func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.D case <-s.Context().Done(): return } - stats := lb.clientStats.toClientStats() + stats := ccw.lb.clientStats.toClientStats() t := time.Now() stats.Timestamp = ×tamppb.Timestamp{ Seconds: t.Unix(), @@ -241,23 +300,23 @@ func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.D } } -func (lb *lbBalancer) callRemoteBalancer() (backoff bool, _ error) { - lbClient := &loadBalancerClient{cc: lb.ccRemoteLB} +func (ccw *remoteBalancerCCWrapper) callRemoteBalancer() (backoff bool, _ error) { + lbClient := &loadBalancerClient{cc: ccw.cc} ctx, cancel := context.WithCancel(context.Background()) defer cancel() stream, err := lbClient.BalanceLoad(ctx, grpc.WaitForReady(true)) if err != nil { return true, fmt.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err) } - lb.mu.Lock() - lb.remoteBalancerConnected = true - lb.mu.Unlock() + ccw.lb.mu.Lock() + ccw.lb.remoteBalancerConnected = true + ccw.lb.mu.Unlock() // grpclb handshake on the stream. initReq := &lbpb.LoadBalanceRequest{ LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{ InitialRequest: &lbpb.InitialLoadBalanceRequest{ - Name: lb.target, + Name: ccw.lb.target, }, }, } @@ -278,19 +337,19 @@ func (lb *lbBalancer) callRemoteBalancer() (backoff bool, _ error) { go func() { if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 { - lb.sendLoadReport(stream, d) + ccw.sendLoadReport(stream, d) } }() // No backoff if init req/resp handshake was successful. - return false, lb.readServerList(stream) + return false, ccw.readServerList(stream) } -func (lb *lbBalancer) watchRemoteBalancer() { +func (ccw *remoteBalancerCCWrapper) watchRemoteBalancer() { var retryCount int for { - doBackoff, err := lb.callRemoteBalancer() + doBackoff, err := ccw.callRemoteBalancer() select { - case <-lb.doneCh: + case <-ccw.done: return default: if err != nil { @@ -302,71 +361,31 @@ func (lb *lbBalancer) watchRemoteBalancer() { } } // Trigger a re-resolve when the stream errors. - lb.cc.cc.ResolveNow(resolver.ResolveNowOption{}) + ccw.lb.cc.cc.ResolveNow(resolver.ResolveNowOption{}) - lb.mu.Lock() - lb.remoteBalancerConnected = false - lb.fullServerList = nil + ccw.lb.mu.Lock() + ccw.lb.remoteBalancerConnected = false + ccw.lb.fullServerList = nil // Enter fallback when connection to remote balancer is lost, and the // aggregated state is not Ready. - if !lb.inFallback && lb.state != connectivity.Ready { + if !ccw.lb.inFallback && ccw.lb.state != connectivity.Ready { // Entering fallback. - lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst) + ccw.lb.refreshSubConns(ccw.lb.resolvedBackendAddrs, true, ccw.lb.usePickFirst) } - lb.mu.Unlock() + ccw.lb.mu.Unlock() if !doBackoff { retryCount = 0 continue } - timer := time.NewTimer(lb.backoff.Backoff(retryCount)) + timer := time.NewTimer(ccw.backoff.Backoff(retryCount)) // Copy backoff select { case <-timer.C: - case <-lb.doneCh: + case <-ccw.done: timer.Stop() return } retryCount++ } } - -func (lb *lbBalancer) dialRemoteLB() { - var dopts []grpc.DialOption - if creds := lb.opt.DialCreds; creds != nil { - dopts = append(dopts, grpc.WithTransportCredentials(creds)) - } else if bundle := lb.grpclbClientConnCreds; bundle != nil { - dopts = append(dopts, grpc.WithCredentialsBundle(bundle)) - } else { - dopts = append(dopts, grpc.WithInsecure()) - } - if lb.opt.Dialer != nil { - dopts = append(dopts, grpc.WithContextDialer(lb.opt.Dialer)) - } - // Explicitly set pickfirst as the balancer. - dopts = append(dopts, grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"pick_first"}`)) - wrb := internal.WithResolverBuilder.(func(resolver.Builder) grpc.DialOption) - dopts = append(dopts, wrb(lb.manualResolver)) - if channelz.IsOn() { - dopts = append(dopts, grpc.WithChannelzParentID(lb.opt.ChannelzParentID)) - } - - // Enable Keepalive for grpclb client. - dopts = append(dopts, grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: 20 * time.Second, - Timeout: 10 * time.Second, - PermitWithoutStream: true, - })) - - // DialContext using manualResolver.Scheme, which is a random scheme - // generated when init grpclb. The target scheme here is not important. - // - // The grpc dial target will be used by the creds (ALTS) as the authority, - // so it has to be set to remoteLBName that comes from resolver. - cc, err := grpc.DialContext(context.Background(), "grpclb.subClientConn", dopts...) - if err != nil { - grpclog.Fatalf("failed to dial: %v", err) - } - lb.ccRemoteLB = cc - go lb.watchRemoteBalancer() -} diff --git a/balancer/grpclb/grpclb_test.go b/balancer/grpclb/grpclb_test.go index 5ea0cc7d6682..74d69095f343 100644 --- a/balancer/grpclb/grpclb_test.go +++ b/balancer/grpclb/grpclb_test.go @@ -934,11 +934,11 @@ func TestFallBackWithNoServerAddress(t *testing.T) { ServiceConfig: scpr, }) - select { - case <-resolveNowCh: - t.Fatalf("unexpected resolveNow when grpclb gets no balancer address") - case <-time.After(time.Second): - } + // select { + // case <-resolveNowCh: + // t.Fatalf("unexpected resolveNow when grpclb gets no balancer address") + // case <-time.After(time.Second): + // } var p peer.Peer rpcCtx, rpcCancel := context.WithTimeout(context.Background(), time.Second) From e49600987ceaa9fe0ba3e5a5428067fd2096ea20 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 22 Oct 2019 16:11:47 -0700 Subject: [PATCH 4/8] 1 --- balancer/grpclb/grpclb_test.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/balancer/grpclb/grpclb_test.go b/balancer/grpclb/grpclb_test.go index 74d69095f343..6fd96d282b3e 100644 --- a/balancer/grpclb/grpclb_test.go +++ b/balancer/grpclb/grpclb_test.go @@ -934,11 +934,11 @@ func TestFallBackWithNoServerAddress(t *testing.T) { ServiceConfig: scpr, }) - // select { - // case <-resolveNowCh: - // t.Fatalf("unexpected resolveNow when grpclb gets no balancer address") - // case <-time.After(time.Second): - // } + select { + case <-resolveNowCh: + t.Errorf("unexpected resolveNow when grpclb gets no balancer address 1111") + case <-time.After(time.Second): + } var p peer.Peer rpcCtx, rpcCancel := context.WithTimeout(context.Background(), time.Second) @@ -950,6 +950,12 @@ func TestFallBackWithNoServerAddress(t *testing.T) { t.Fatalf("got peer: %v, want peer: %v", p.Addr, beLis.Addr()) } + select { + case <-resolveNowCh: + t.Errorf("unexpected resolveNow when grpclb gets no balancer address 2222") + case <-time.After(time.Second): + } + tss.ls.sls <- sl // Send an update with balancer address. The backends behind grpclb should // be used. From a49355385b4f1ea7d79366b1d8374377bd5cf16f Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 22 Oct 2019 16:17:17 -0700 Subject: [PATCH 5/8] 2 --- balancer/grpclb/grpclb.go | 8 ++++---- balancer/grpclb/grpclb_remote_balancer.go | 11 +++++++++++ balancer/grpclb/grpclb_test.go | 4 ++-- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/balancer/grpclb/grpclb.go b/balancer/grpclb/grpclb.go index 4aa5384d69a2..4f8527c487e0 100644 --- a/balancer/grpclb/grpclb.go +++ b/balancer/grpclb/grpclb.go @@ -441,10 +441,10 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error } } - // if len(remoteBalancerAddrs) == 0 && lb.ccRemoteLB != nil { - // lb.ccRemoteLB.close() - // lb.ccRemoteLB = nil - // } + if len(remoteBalancerAddrs) == 0 && lb.ccRemoteLB != nil { + lb.ccRemoteLB.close() + lb.ccRemoteLB = nil + } if lb.ccRemoteLB == nil { // First time receiving resolved addresses, create a cc to remote diff --git a/balancer/grpclb/grpclb_remote_balancer.go b/balancer/grpclb/grpclb_remote_balancer.go index b326ffdaa7b8..748d145191dc 100644 --- a/balancer/grpclb/grpclb_remote_balancer.go +++ b/balancer/grpclb/grpclb_remote_balancer.go @@ -23,6 +23,7 @@ import ( "fmt" "io" "net" + "sync" "time" "github.com/golang/protobuf/proto" @@ -207,6 +208,9 @@ type remoteBalancerCCWrapper struct { lb *lbBalancer backoff backoff.Strategy done chan struct{} + + // waitgroup to wait for all goroutines to exit. + wg sync.WaitGroup } func (lb *lbBalancer) newRemoteBalancerCCWrapper() { @@ -255,9 +259,12 @@ func (lb *lbBalancer) newRemoteBalancerCCWrapper() { go ccw.watchRemoteBalancer() } +// close closed the ClientConn to remote balancer, and waits until all +// goroutines to finish. func (ccw *remoteBalancerCCWrapper) close() { close(ccw.done) ccw.cc.Close() + ccw.wg.Wait() } func (ccw *remoteBalancerCCWrapper) readServerList(s *balanceLoadClientStream) error { @@ -336,6 +343,8 @@ func (ccw *remoteBalancerCCWrapper) callRemoteBalancer() (backoff bool, _ error) } go func() { + ccw.wg.Add(1) + defer ccw.wg.Done() if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 { ccw.sendLoadReport(stream, d) } @@ -345,6 +354,8 @@ func (ccw *remoteBalancerCCWrapper) callRemoteBalancer() (backoff bool, _ error) } func (ccw *remoteBalancerCCWrapper) watchRemoteBalancer() { + ccw.wg.Add(1) + defer ccw.wg.Done() var retryCount int for { doBackoff, err := ccw.callRemoteBalancer() diff --git a/balancer/grpclb/grpclb_test.go b/balancer/grpclb/grpclb_test.go index 6fd96d282b3e..ffb456ceac30 100644 --- a/balancer/grpclb/grpclb_test.go +++ b/balancer/grpclb/grpclb_test.go @@ -936,7 +936,7 @@ func TestFallBackWithNoServerAddress(t *testing.T) { select { case <-resolveNowCh: - t.Errorf("unexpected resolveNow when grpclb gets no balancer address 1111") + t.Errorf("unexpected resolveNow when grpclb gets no balancer address 1111, %d", i) case <-time.After(time.Second): } @@ -952,7 +952,7 @@ func TestFallBackWithNoServerAddress(t *testing.T) { select { case <-resolveNowCh: - t.Errorf("unexpected resolveNow when grpclb gets no balancer address 2222") + t.Errorf("unexpected resolveNow when grpclb gets no balancer address 2222, %d", i) case <-time.After(time.Second): } From 50678f3f5f34d8011c1a88bf3b6c6c873876513f Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 22 Oct 2019 16:21:39 -0700 Subject: [PATCH 6/8] wait for remote CC to close --- balancer/grpclb/grpclb.go | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/balancer/grpclb/grpclb.go b/balancer/grpclb/grpclb.go index 4f8527c487e0..9727e12f34a6 100644 --- a/balancer/grpclb/grpclb.go +++ b/balancer/grpclb/grpclb.go @@ -441,12 +441,12 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error } } - if len(remoteBalancerAddrs) == 0 && lb.ccRemoteLB != nil { - lb.ccRemoteLB.close() - lb.ccRemoteLB = nil - } - - if lb.ccRemoteLB == nil { + if len(remoteBalancerAddrs) == 0 { + if lb.ccRemoteLB != nil { + lb.ccRemoteLB.close() + lb.ccRemoteLB = nil + } + } else if lb.ccRemoteLB == nil { // First time receiving resolved addresses, create a cc to remote // balancers. lb.newRemoteBalancerCCWrapper() @@ -454,12 +454,11 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error go lb.fallbackToBackendsAfter(lb.fallbackTimeout) } - // cc to remote balancers uses lb.manualResolver. Send the updated remote - // balancer addresses to it through manualResolver. - // - // If remoteBalancerAddrs is empty, this could trigger resolveNow() in the - // sub-ClientConn. - lb.manualResolver.UpdateState(resolver.State{Addresses: remoteBalancerAddrs}) + if lb.ccRemoteLB != nil { + // cc to remote balancers uses lb.manualResolver. Send the updated remote + // balancer addresses to it through manualResolver. + lb.manualResolver.UpdateState(resolver.State{Addresses: remoteBalancerAddrs}) + } lb.mu.Lock() lb.resolvedBackendAddrs = backendAddrs From 55187161ca3f47e5c3f7f97181b87dbe4c1f59d2 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 23 Oct 2019 10:15:45 -0700 Subject: [PATCH 7/8] waitgroup and vet --- balancer/grpclb/grpclb_remote_balancer.go | 9 +++++---- vet.sh | 1 + 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/balancer/grpclb/grpclb_remote_balancer.go b/balancer/grpclb/grpclb_remote_balancer.go index 748d145191dc..3bd853d46f1d 100644 --- a/balancer/grpclb/grpclb_remote_balancer.go +++ b/balancer/grpclb/grpclb_remote_balancer.go @@ -110,8 +110,9 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback fallbackModeChanged := lb.inFallback != fallback lb.inFallback = fallback if fallbackModeChanged && lb.inFallback { - // Clear previous received list, so if the server is back and sends the - // same list again, the new addresses will be used. + // Clear previous received list when entering fallback, so if the server + // comes back and sends the same list again, the new addresses will be + // used. lb.fullServerList = nil } @@ -256,6 +257,7 @@ func (lb *lbBalancer) newRemoteBalancerCCWrapper() { done: make(chan struct{}), } lb.ccRemoteLB = ccw + ccw.wg.Add(1) go ccw.watchRemoteBalancer() } @@ -342,8 +344,8 @@ func (ccw *remoteBalancerCCWrapper) callRemoteBalancer() (backoff bool, _ error) return true, fmt.Errorf("grpclb: Delegation is not supported") } + ccw.wg.Add(1) go func() { - ccw.wg.Add(1) defer ccw.wg.Done() if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 { ccw.sendLoadReport(stream, d) @@ -354,7 +356,6 @@ func (ccw *remoteBalancerCCWrapper) callRemoteBalancer() (backoff bool, _ error) } func (ccw *remoteBalancerCCWrapper) watchRemoteBalancer() { - ccw.wg.Add(1) defer ccw.wg.Done() var retryCount int for { diff --git a/vet.sh b/vet.sh index 7bbcea97fafd..f324be509a47 100755 --- a/vet.sh +++ b/vet.sh @@ -115,6 +115,7 @@ fi staticcheck -go 1.9 -checks 'inherit,-ST1015' -ignore ' google.golang.org/grpc/balancer.go:SA1019 google.golang.org/grpc/balancer/grpclb/grpclb_remote_balancer.go:SA1019 +google.golang.org/grpc/balancer/grpclb/grpclb_test.go:SA1019 google.golang.org/grpc/balancer/roundrobin/roundrobin_test.go:SA1019 google.golang.org/grpc/xds/internal/balancer/edsbalancer/balancergroup.go:SA1019 google.golang.org/grpc/xds/internal/resolver/xds_resolver.go:SA1019 From e86ba20e3b9ea9943d8f04eb69c5868b87e9b354 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 30 Oct 2019 11:59:51 -0700 Subject: [PATCH 8/8] comments (on comments) --- balancer/grpclb/grpclb.go | 2 +- balancer/grpclb/grpclb_remote_balancer.go | 7 +++---- balancer/grpclb/grpclb_test.go | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/balancer/grpclb/grpclb.go b/balancer/grpclb/grpclb.go index 9727e12f34a6..e79b1a7bfdc5 100644 --- a/balancer/grpclb/grpclb.go +++ b/balancer/grpclb/grpclb.go @@ -426,7 +426,7 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error addrs := ccs.ResolverState.Addresses if len(addrs) == 0 { - // There's should be at least one address, either grpclb server or + // There should be at least one address, either grpclb server or // fallback. Empty address is not valid. return balancer.ErrBadResolverState } diff --git a/balancer/grpclb/grpclb_remote_balancer.go b/balancer/grpclb/grpclb_remote_balancer.go index 3bd853d46f1d..897ca8ff4729 100644 --- a/balancer/grpclb/grpclb_remote_balancer.go +++ b/balancer/grpclb/grpclb_remote_balancer.go @@ -241,11 +241,10 @@ func (lb *lbBalancer) newRemoteBalancerCCWrapper() { PermitWithoutStream: true, })) - // DialContext using manualResolver.Scheme, which is a random scheme - // generated when init grpclb. The target scheme here is not important. + // The dial target is not important. // - // The grpc dial target will be used by the creds (ALTS) as the authority, - // so it has to be set to remoteLBName that comes from resolver. + // The grpclb server addresses will set field ServerName, and creds will + // receive ServerName as authority. cc, err := grpc.DialContext(context.Background(), "grpclb.subClientConn", dopts...) if err != nil { grpclog.Fatalf("failed to dial: %v", err) diff --git a/balancer/grpclb/grpclb_test.go b/balancer/grpclb/grpclb_test.go index ffb456ceac30..82eb153fce1c 100644 --- a/balancer/grpclb/grpclb_test.go +++ b/balancer/grpclb/grpclb_test.go @@ -91,7 +91,7 @@ func (c *serverNameCheckCreds) ClientHandshake(ctx context.Context, authority st } if authority != string(b) { fmt.Printf("test-creds: got authority from ClientConn %q, expected by server %q\n", authority, string(b)) - return nil, nil, errors.New("received unexpected server nameq") + return nil, nil, errors.New("received unexpected server name") } return rawConn, nil, nil }