Skip to content

Commit

Permalink
xds/ringhash: update connectivity state aggregation, and make sure at…
Browse files Browse the repository at this point in the history
… least one SubConn is connecting in TF (#5338)
  • Loading branch information
menghanl authored May 18, 2022
1 parent e23132c commit 333a441
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 24 deletions.
27 changes: 27 additions & 0 deletions xds/internal/balancer/ringhash/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ func (p *picker) handleTransientFailure(e *ringEntry) (balancer.PickResult, erro
return balancer.PickResult{}, fmt.Errorf("no connection is Ready")
}

// nextSkippingDuplicates finds the next entry in the ring, with a different
// subconn from the given entry.
func nextSkippingDuplicates(ring *ring, entry *ringEntry) *ringEntry {
for next := ring.next(entry); next != entry; next = ring.next(next) {
if next.sc != entry.sc {
Expand All @@ -152,3 +154,28 @@ func nextSkippingDuplicates(ring *ring, entry *ringEntry) *ringEntry {
// There's no qualifying next entry.
return nil
}

// nextSkippingDuplicatesSubConn finds the next subconn in the ring, that's
// different from the given subconn.
func nextSkippingDuplicatesSubConn(ring *ring, sc *subConn) *subConn {
var entry *ringEntry
for _, it := range ring.items {
if it.sc == sc {
entry = it
break
}
}
if entry == nil {
// If the given subconn is not in the ring (e.g. it was deleted), return
// the first one.
if len(ring.items) > 0 {
return ring.items[0].sc
}
return nil
}
ee := nextSkippingDuplicates(ring, entry)
if ee == nil {
return nil
}
return ee.sc
}
67 changes: 60 additions & 7 deletions xds/internal/balancer/ringhash/ringhash.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ type subConn struct {
// When connectivity state is updated to Idle for this SubConn, if
// connectQueued is true, Connect() will be called on the SubConn.
connectQueued bool
// attemptingToConnect indicates if this subconn is attempting to connect.
// It's set when queueConnect is called. It's unset when the state is
// changed to Ready/Shutdown, or Idle (and if connectQueued is false).
attemptingToConnect bool
}

// setState updates the state of this SubConn.
Expand All @@ -113,6 +117,8 @@ func (sc *subConn) setState(s connectivity.State) {
if sc.connectQueued {
sc.connectQueued = false
sc.sc.Connect()
} else {
sc.attemptingToConnect = false
}
case connectivity.Connecting:
// Clear connectQueued if the SubConn isn't failing. This state
Expand All @@ -122,11 +128,14 @@ func (sc *subConn) setState(s connectivity.State) {
// Clear connectQueued if the SubConn isn't failing. This state
// transition is unlikely to happen, but handle this just in case.
sc.connectQueued = false
sc.attemptingToConnect = false
// Set to a non-failing state.
sc.failing = false
case connectivity.TransientFailure:
// Set to a failing state.
sc.failing = true
case connectivity.Shutdown:
sc.attemptingToConnect = false
}
sc.state = s
}
Expand All @@ -149,6 +158,7 @@ func (sc *subConn) effectiveState() connectivity.State {
func (sc *subConn) queueConnect() {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.attemptingToConnect = true
if sc.state == connectivity.Idle {
sc.sc.Connect()
return
Expand All @@ -158,6 +168,12 @@ func (sc *subConn) queueConnect() {
sc.connectQueued = true
}

func (sc *subConn) isAttemptingToConnect() bool {
sc.mu.Lock()
defer sc.mu.Unlock()
return sc.attemptingToConnect
}

type ringhashBalancer struct {
cc balancer.ClientConn
logger *grpclog.PrefixLogger
Expand Down Expand Up @@ -268,7 +284,8 @@ func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) err
var err error
b.ring, err = newRing(b.subConns, b.config.MinRingSize, b.config.MaxRingSize)
if err != nil {
panic(err)
b.ResolverError(fmt.Errorf("ringhash failed to make a new ring: %v", err))
return balancer.ErrBadResolverState
}
b.regeneratePicker()
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
Expand Down Expand Up @@ -334,12 +351,6 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance

switch s {
case connectivity.Idle:
// When the overall state is TransientFailure, this will never get picks
// if there's a lower priority. Need to keep the SubConns connecting so
// there's a chance it will recover.
if b.state == connectivity.TransientFailure {
scs.queueConnect()
}
// No need to send an update. No queued RPC can be unblocked. If the
// overall state changed because of this, sendUpdate is already true.
case connectivity.Connecting:
Expand All @@ -364,6 +375,35 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance
if sendUpdate {
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
}

switch b.state {
case connectivity.Connecting, connectivity.TransientFailure:
// When overall state is TransientFailure, we need to make sure at least
// one SubConn is attempting to connect, otherwise this balancer may
// never get picks if the parent is priority.
//
// Because we report Connecting as the overall state when only one
// SubConn is in TransientFailure, we do the same check for Connecting
// here.
//
// Note that this check also covers deleting SubConns due to address
// change. E.g. if the SubConn attempting to connect is deleted, and the
// overall state is TF. Since there must be at least one SubConn
// attempting to connect, we need to trigger one. But since the deleted
// SubConn will eventually send a shutdown update, this code will run
// and trigger the next SubConn to connect.
for _, sc := range b.subConns {
if sc.isAttemptingToConnect() {
return
}
}
// Trigger a SubConn (this updated SubConn's next SubConn in the ring)
// to connect if nobody is attempting to connect.
sc := nextSkippingDuplicatesSubConn(b.ring, scs)
if sc != nil {
sc.queueConnect()
}
}
}

// mergeErrors builds an error from the last connection error and the last
Expand Down Expand Up @@ -395,6 +435,7 @@ func (b *ringhashBalancer) Close() {}
//
// It's not thread safe.
type connectivityStateEvaluator struct {
sum uint64
nums [5]uint64
}

Expand All @@ -404,6 +445,7 @@ type connectivityStateEvaluator struct {
// - If there is at least one subchannel in READY state, report READY.
// - If there are 2 or more subchannels in TRANSIENT_FAILURE state, report TRANSIENT_FAILURE.
// - If there is at least one subchannel in CONNECTING state, report CONNECTING.
// - If there is one subchannel in TRANSIENT_FAILURE and there is more than one subchannel, report state CONNECTING.
// - If there is at least one subchannel in Idle state, report Idle.
// - Otherwise, report TRANSIENT_FAILURE.
//
Expand All @@ -417,6 +459,14 @@ func (cse *connectivityStateEvaluator) recordTransition(oldState, newState conne
updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
cse.nums[state] += updateVal
}
if oldState == connectivity.Shutdown {
// There's technically no transition from Shutdown. But we record a
// Shutdown->Idle transition when a new SubConn is created.
cse.sum++
}
if newState == connectivity.Shutdown {
cse.sum--
}

if cse.nums[connectivity.Ready] > 0 {
return connectivity.Ready
Expand All @@ -427,6 +477,9 @@ func (cse *connectivityStateEvaluator) recordTransition(oldState, newState conne
if cse.nums[connectivity.Connecting] > 0 {
return connectivity.Connecting
}
if cse.nums[connectivity.TransientFailure] > 0 && cse.sum > 1 {
return connectivity.Connecting
}
if cse.nums[connectivity.Idle] > 0 {
return connectivity.Idle
}
Expand Down
60 changes: 43 additions & 17 deletions xds/internal/balancer/ringhash/ringhash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,8 @@ func TestAddrWeightChange(t *testing.T) {
}

// TestSubConnToConnectWhenOverallTransientFailure covers the situation when the
// overall state is TransientFailure, the SubConns turning Idle will be
// triggered to Connect(). But not when the overall state is not
// overall state is TransientFailure, the SubConns turning Idle will trigger the
// next SubConn in the ring to Connect(). But not when the overall state is not
// TransientFailure.
func TestSubConnToConnectWhenOverallTransientFailure(t *testing.T) {
wantAddrs := []resolver.Address{
Expand All @@ -377,30 +377,56 @@ func TestSubConnToConnectWhenOverallTransientFailure(t *testing.T) {
_, b, p0 := setupTest(t, wantAddrs)
ring0 := p0.(*picker).ring

// Turn all SubConns to TransientFailure.
for _, it := range ring0.items {
b.UpdateSubConnState(it.sc.sc, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
}

// The next one turning Idle should Connect().
// Turn the first subconn to transient failure.
sc0 := ring0.items[0].sc.sc
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Idle})

// It will trigger the second subconn to connect (because overall state is
// Connect (when one subconn is TF)).
sc1 := ring0.items[1].sc.sc
select {
case <-sc1.(*testutils.TestSubConn).ConnectCh:
case <-time.After(defaultTestShortTimeout):
t.Fatalf("timeout waiting for Connect() from SubConn %v", sc1)
}

// Turn the second subconn to TF. This will set the overall state to TF.
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Idle})

// It will trigger the third subconn to connect.
sc2 := ring0.items[2].sc.sc
select {
case <-sc2.(*testutils.TestSubConn).ConnectCh:
case <-time.After(defaultTestShortTimeout):
t.Fatalf("timeout waiting for Connect() from SubConn %v", sc2)
}

// Turn the third subconn to TF. This will set the overall state to TF.
b.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
b.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Idle})

// It will trigger the first subconn to connect.
select {
case <-sc0.(*testutils.TestSubConn).ConnectCh:
case <-time.After(defaultTestTimeout):
t.Errorf("timeout waiting for Connect() from SubConn %v", sc0)
case <-time.After(defaultTestShortTimeout):
t.Fatalf("timeout waiting for Connect() from SubConn %v", sc0)
}

// If this SubConn is ready. Other SubConns turning Idle will not Connect().
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Turn the third subconn to TF again.
b.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
b.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Idle})

// The third SubConn in the ring should connect.
sc1 := ring0.items[1].sc.sc
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Idle})
// This will not trigger any new Connect() on the SubConns, because sc0 is
// still attempting to connect, and we only need one SubConn to connect.
select {
case <-sc0.(*testutils.TestSubConn).ConnectCh:
t.Fatalf("unexpected Connect() from SubConn %v", sc0)
case <-sc1.(*testutils.TestSubConn).ConnectCh:
t.Errorf("unexpected Connect() from SubConn %v", sc1)
t.Fatalf("unexpected Connect() from SubConn %v", sc1)
case <-sc2.(*testutils.TestSubConn).ConnectCh:
t.Fatalf("unexpected Connect() from SubConn %v", sc2)
case <-time.After(defaultTestShortTimeout):
}
}
Expand Down

0 comments on commit 333a441

Please sign in to comment.