diff --git a/clientv3/balancer.go b/clientv3/balancer.go index b7767221ac7a..b9d543b49ea3 100644 --- a/clientv3/balancer.go +++ b/clientv3/balancer.go @@ -89,6 +89,7 @@ func newSimpleBalancer(eps []string) *simpleBalancer { updateAddrsC: make(chan struct{}, 1), host2ep: getHost2ep(eps), } + close(sb.downc) go sb.updateNotifyLoop() return sb } @@ -170,38 +171,55 @@ func (b *simpleBalancer) updateNotifyLoop() { for { b.mu.RLock() - upc := b.upc + upc, downc := b.upc, b.downc b.mu.RUnlock() - var downc chan struct{} + // downc or upc should be closed + select { + case <-downc: + downc = nil + default: + } select { case <-upc: - var addr string - b.mu.RLock() - addr = b.pinAddr - // Up() sets pinAddr and downc as a pair under b.mu - downc = b.downc - b.mu.RUnlock() - if addr == "" { - break + upc = nil + case <-b.stopc: + return + default: + } + switch { + case downc == nil && upc == nil: + // stale + case downc == nil: + b.notifyAddrs() + select { + case <-upc: + b.mu.RLock() + addr := b.pinAddr + b.mu.RUnlock() + if addr == "" { + break + } + // close opened connections that are not pinAddr + // this ensures only one connection is open per client + select { + case b.notifyCh <- []grpc.Address{{Addr: addr}}: + case <-b.stopc: + return + } + case <-b.updateAddrsC: + b.notifyAddrs() + continue + case <-b.stopc: + return } - // close opened connections that are not pinAddr - // this ensures only one connection is open per client + case upc == nil: select { - case b.notifyCh <- []grpc.Address{{Addr: addr}}: + case <-downc: + case <-b.updateAddrsC: + b.notifyAddrs() case <-b.stopc: return } - case <-b.updateAddrsC: - b.notifyAddrs() - continue - } - select { - case <-downc: - b.notifyAddrs() - case <-b.updateAddrsC: - b.notifyAddrs() - case <-b.stopc: - return } } } @@ -231,16 +249,15 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) { if !hasAddr(b.addrs, addr.Addr) { return func(err error) {} } - - if b.pinAddr == "" { - // notify waiting Get()s and pin first connected address - close(b.upc) - b.downc = make(chan struct{}) - b.pinAddr = addr.Addr - // notify client that a connection is up - b.readyOnce.Do(func() { close(b.readyc) }) + if b.pinAddr != "" { + return func(err error) {} } - + // notify waiting Get()s and pin first connected address + close(b.upc) + b.downc = make(chan struct{}) + b.pinAddr = addr.Addr + // notify client that a connection is up + b.readyOnce.Do(func() { close(b.readyc) }) return func(err error) { b.mu.Lock() if b.pinAddr == addr.Addr {