From 78fba8b1cae96a131e4bb60c178ceae8399096dd Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 28 Apr 2017 16:57:22 -0700 Subject: [PATCH] clientv3: don't race on upc/downc/switch endpoints in balancer If the balancer update notification loop starts with a downed connection and endpoints are switched while the old connection is up, the balancer can potentially wait forever for an up connection without refreshing the connections to reflect the current endpoints. Instead, fetch upc/downc together, only caring about a single transition either from down->up or up->down for each iteration Simple way to reproduce failures: add time.Sleep(time.Second) to the beginning of the update notification loop. --- clientv3/balancer.go | 83 ++++++++++++++++++++++++++------------------ 1 file changed, 50 insertions(+), 33 deletions(-) diff --git a/clientv3/balancer.go b/clientv3/balancer.go index b7767221ac7a..b9d543b49ea3 100644 --- a/clientv3/balancer.go +++ b/clientv3/balancer.go @@ -89,6 +89,7 @@ func newSimpleBalancer(eps []string) *simpleBalancer { updateAddrsC: make(chan struct{}, 1), host2ep: getHost2ep(eps), } + close(sb.downc) go sb.updateNotifyLoop() return sb } @@ -170,38 +171,55 @@ func (b *simpleBalancer) updateNotifyLoop() { for { b.mu.RLock() - upc := b.upc + upc, downc := b.upc, b.downc b.mu.RUnlock() - var downc chan struct{} + // downc or upc should be closed + select { + case <-downc: + downc = nil + default: + } select { case <-upc: - var addr string - b.mu.RLock() - addr = b.pinAddr - // Up() sets pinAddr and downc as a pair under b.mu - downc = b.downc - b.mu.RUnlock() - if addr == "" { - break + upc = nil + case <-b.stopc: + return + default: + } + switch { + case downc == nil && upc == nil: + // stale + case downc == nil: + b.notifyAddrs() + select { + case <-upc: + b.mu.RLock() + addr := b.pinAddr + b.mu.RUnlock() + if addr == "" { + break + } + // close opened connections that are not pinAddr + // this ensures only one connection is open per client + select { + case b.notifyCh <- []grpc.Address{{Addr: addr}}: + case <-b.stopc: + return + } + case <-b.updateAddrsC: + b.notifyAddrs() + continue + case <-b.stopc: + return } - // close opened connections that are not pinAddr - // this ensures only one connection is open per client + case upc == nil: select { - case b.notifyCh <- []grpc.Address{{Addr: addr}}: + case <-downc: + case <-b.updateAddrsC: + b.notifyAddrs() case <-b.stopc: return } - case <-b.updateAddrsC: - b.notifyAddrs() - continue - } - select { - case <-downc: - b.notifyAddrs() - case <-b.updateAddrsC: - b.notifyAddrs() - case <-b.stopc: - return } } } @@ -231,16 +249,15 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) { if !hasAddr(b.addrs, addr.Addr) { return func(err error) {} } - - if b.pinAddr == "" { - // notify waiting Get()s and pin first connected address - close(b.upc) - b.downc = make(chan struct{}) - b.pinAddr = addr.Addr - // notify client that a connection is up - b.readyOnce.Do(func() { close(b.readyc) }) + if b.pinAddr != "" { + return func(err error) {} } - + // notify waiting Get()s and pin first connected address + close(b.upc) + b.downc = make(chan struct{}) + b.pinAddr = addr.Addr + // notify client that a connection is up + b.readyOnce.Do(func() { close(b.readyc) }) return func(err error) { b.mu.Lock() if b.pinAddr == addr.Addr {