diff --git a/clientv3/balancer.go b/clientv3/balancer.go index b7767221ac7..6ae047e9841 100644 --- a/clientv3/balancer.go +++ b/clientv3/balancer.go @@ -77,7 +77,6 @@ func newSimpleBalancer(eps []string) *simpleBalancer { for i := range eps { addrs[i].Addr = getHost(eps[i]) } - notifyCh <- addrs sb := &simpleBalancer{ addrs: addrs, notifyCh: notifyCh, @@ -89,6 +88,7 @@ func newSimpleBalancer(eps []string) *simpleBalancer { updateAddrsC: make(chan struct{}, 1), host2ep: getHost2ep(eps), } + close(sb.downc) go sb.updateNotifyLoop() return sb } @@ -170,38 +170,51 @@ func (b *simpleBalancer) updateNotifyLoop() { for { b.mu.RLock() - upc := b.upc + upc, downc, addr := b.upc, b.downc, b.pinAddr b.mu.RUnlock() - var downc chan struct{} + // downc or upc should be closed + select { + case <-downc: + downc = nil + default: + } select { case <-upc: - var addr string - b.mu.RLock() - addr = b.pinAddr - // Up() sets pinAddr and downc as a pair under b.mu - downc = b.downc - b.mu.RUnlock() - if addr == "" { - break + upc = nil + default: + } + switch { + case downc == nil && upc == nil: + // stale + select { + case <-b.stopc: + return + default: } - // close opened connections that are not pinAddr - // this ensures only one connection is open per client + case downc == nil: + b.notifyAddrs() select { + case <-upc: + case <-b.updateAddrsC: + b.notifyAddrs() + case <-b.stopc: + return + } + case upc == nil: + select { + // close connections that are not the pinned address case b.notifyCh <- []grpc.Address{{Addr: addr}}: + case <-downc: + case <-b.stopc: + return + } + select { + case <-downc: + case <-b.updateAddrsC: case <-b.stopc: return } - case <-b.updateAddrsC: - b.notifyAddrs() - continue - } - select { - case <-downc: - b.notifyAddrs() - case <-b.updateAddrsC: b.notifyAddrs() - case <-b.stopc: - return } } } @@ -231,23 +244,20 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) { if !hasAddr(b.addrs, addr.Addr) { return func(err error) {} } - - if b.pinAddr == "" { - // notify waiting Get()s and pin first connected address - close(b.upc) - b.downc = make(chan struct{}) - b.pinAddr = addr.Addr - // notify client that a connection is up - b.readyOnce.Do(func() { close(b.readyc) }) + if b.pinAddr != "" { + return func(err error) {} } - + // notify waiting Get()s and pin first connected address + close(b.upc) + b.downc = make(chan struct{}) + b.pinAddr = addr.Addr + // notify client that a connection is up + b.readyOnce.Do(func() { close(b.readyc) }) return func(err error) { b.mu.Lock() - if b.pinAddr == addr.Addr { - b.upc = make(chan struct{}) - close(b.downc) - b.pinAddr = "" - } + b.upc = make(chan struct{}) + close(b.downc) + b.pinAddr = "" b.mu.Unlock() } } @@ -280,6 +290,8 @@ func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) b.mu.RUnlock() select { case <-ch: + case <-b.donec: + return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing case <-ctx.Done(): return grpc.Address{Addr: ""}, nil, ctx.Err() } diff --git a/integration/bridge.go b/integration/bridge.go index 09c65aa1f3c..b9e67318e52 100644 --- a/integration/bridge.go +++ b/integration/bridge.go @@ -119,6 +119,7 @@ func (b *bridge) serveListen() { b.mu.Unlock() select { case <-b.stopc: + inc.Close() return case <-pausec: } @@ -152,10 +153,12 @@ func (b *bridge) serveConn(bc *bridgeConn) { wg.Add(2) go func() { io.Copy(bc.out, bc.in) + bc.close() wg.Done() }() go func() { io.Copy(bc.in, bc.out) + bc.close() wg.Done() }() wg.Wait() @@ -168,7 +171,11 @@ type bridgeConn struct { } func (bc *bridgeConn) Close() { + bc.close() + <-bc.donec +} + +func (bc *bridgeConn) close() { bc.in.Close() bc.out.Close() - <-bc.donec }