Skip to content

Commit

Permalink
clientv3: don't race on upc/downc/switch endpoints in balancer
Browse files Browse the repository at this point in the history
If the balancer update notification loop starts with a downed
connection and endpoints are switched while the old connection is up,
the balancer can potentially wait forever for an up connection without
refreshing the connections to reflect the current endpoints.

Instead, fetch upc/downc together, only caring about a single transition
either from down->up or up->down for each iteration

Simple way to reproduce failures: add time.Sleep(time.Second) to the
beginning of the update notification loop.
  • Loading branch information
Anthony Romano committed May 1, 2017
1 parent f0ca65a commit 343d13b
Showing 1 changed file with 48 additions and 32 deletions.
80 changes: 48 additions & 32 deletions clientv3/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func newSimpleBalancer(eps []string) *simpleBalancer {
updateAddrsC: make(chan struct{}, 1),
host2ep: getHost2ep(eps),
}
close(sb.downc)
go sb.updateNotifyLoop()
return sb
}
Expand Down Expand Up @@ -170,38 +171,54 @@ func (b *simpleBalancer) updateNotifyLoop() {

for {
b.mu.RLock()
upc := b.upc
upc, downc, addr := b.upc, b.downc, b.pinAddr
b.mu.RUnlock()
var downc chan struct{}
// downc or upc should be closed
select {
case <-downc:
downc = nil
default:
}
select {
case <-upc:
var addr string
b.mu.RLock()
addr = b.pinAddr
// Up() sets pinAddr and downc as a pair under b.mu
downc = b.downc
b.mu.RUnlock()
if addr == "" {
break
upc = nil
case <-b.stopc:
return
default:
}
switch {
case downc == nil && upc == nil:
// stale
case downc == nil:
b.notifyAddrs()
select {
case <-upc:
// close opened connections that are not pinAddr
// this ensures only one connection is open per client
select {
case b.notifyCh <- []grpc.Address{{Addr: addr}}:
case <-b.stopc:
return
}
case <-b.updateAddrsC:
b.notifyAddrs()
continue
case <-b.stopc:
return
}
// close opened connections that are not pinAddr
// this ensures only one connection is open per client
case upc == nil:
select {
case b.notifyCh <- []grpc.Address{{Addr: addr}}:
case <-b.stopc:
return
}
case <-b.updateAddrsC:
b.notifyAddrs()
continue
}
select {
case <-downc:
b.notifyAddrs()
case <-b.updateAddrsC:
b.notifyAddrs()
case <-b.stopc:
return
select {
case <-downc:
case <-b.updateAddrsC:
b.notifyAddrs()
case <-b.stopc:
return
}
}
}
}
Expand Down Expand Up @@ -231,16 +248,15 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
if !hasAddr(b.addrs, addr.Addr) {
return func(err error) {}
}

if b.pinAddr == "" {
// notify waiting Get()s and pin first connected address
close(b.upc)
b.downc = make(chan struct{})
b.pinAddr = addr.Addr
// notify client that a connection is up
b.readyOnce.Do(func() { close(b.readyc) })
if b.pinAddr != "" {
return func(err error) {}
}

// notify waiting Get()s and pin first connected address
close(b.upc)
b.downc = make(chan struct{})
b.pinAddr = addr.Addr
// notify client that a connection is up
b.readyOnce.Do(func() { close(b.readyc) })
return func(err error) {
b.mu.Lock()
if b.pinAddr == addr.Addr {
Expand Down

0 comments on commit 343d13b

Please sign in to comment.