Skip to content

Commit

Permalink
transport: remove RequireHandshakeHybrid support (#2529)
Browse files Browse the repository at this point in the history
This removes RequireHandshakeHybrid support and changes the default behavior
to RequireHandshakeOn. Dial calls will now block and wait for a successful
handshake before proceeding. Users relying on the old hybrid behavior (cmux
users) should consult soheilhy/cmux#64.

Also, several tests have been updated to take this into consideration by
sending settings frames.
  • Loading branch information
jeanbza authored Feb 27, 2019
1 parent a51d23e commit 5878d96
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 228 deletions.
158 changes: 52 additions & 106 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,35 +1019,8 @@ func (ac *addrConn) resetTransport() {
reconnect := grpcsync.NewEvent()
prefaceReceived := make(chan struct{})
newTr, err := ac.createTransport(addr, copts, connectDeadline, reconnect, prefaceReceived)
if err == nil {
ac.mu.Lock()
ac.curAddr = addr
ac.transport = newTr
ac.mu.Unlock()

healthCheckConfig := ac.cc.healthCheckConfig()
// LB channel health checking is only enabled when all the four requirements below are met:
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
// 3. a service config with non-empty healthCheckConfig field is provided,
// 4. the current load balancer allows it.
healthcheckManagingState := false
if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
if ac.cc.dopts.healthCheckFunc == nil {
// TODO: add a link to the health check doc in the error message.
grpclog.Error("the client side LB channel health check function has not been set.")
} else {
// TODO(deklerk) refactor to just return transport
go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
healthcheckManagingState = true
}
}
if !healthcheckManagingState {
ac.mu.Lock()
ac.updateConnectivityState(connectivity.Ready)
ac.mu.Unlock()
}
} else {
if err != nil {
ac.cc.blockingpicker.updateConnectionError(err)
hcancel()
if err == errConnClosing {
return
Expand All @@ -1060,55 +1033,46 @@ func (ac *addrConn) resetTransport() {
}

ac.mu.Lock()
reqHandshake := ac.dopts.reqHandshake
ac.curAddr = addr
ac.transport = newTr
ac.mu.Unlock()

<-reconnect.Done()
hcancel()

if reqHandshake == envconfig.RequireHandshakeHybrid {
// In RequireHandshakeHybrid mode, we must check to see whether
// server preface has arrived yet to decide whether to start
// reconnecting at the top of the list (server preface received)
// or continue with the next addr in the list as if the
// connection were not successful (server preface not received).
select {
case <-prefaceReceived:
// We received a server preface - huzzah! We consider this
// a success and restart from the top of the addr list.
ac.mu.Lock()
ac.backoffIdx = 0
ac.mu.Unlock()
break addrLoop
default:
// Despite having set state to READY, in hybrid mode we
// consider this a failure and continue connecting at the
// next addr in the list.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}

ac.updateConnectivityState(connectivity.TransientFailure)
ac.mu.Unlock()

if tryNextAddrFromStart.HasFired() {
break addrLoop
}
healthCheckConfig := ac.cc.healthCheckConfig()
// LB channel health checking is only enabled when all the four requirements below are met:
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
// 3. a service config with non-empty healthCheckConfig field is provided,
// 4. the current load balancer allows it.
healthcheckManagingState := false
if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
if ac.cc.dopts.healthCheckFunc == nil {
// TODO: add a link to the health check doc in the error message.
grpclog.Error("the client side LB channel health check function has not been set.")
} else {
// TODO(deklerk) refactor to just return transport
go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
healthcheckManagingState = true
}
} else {
// In RequireHandshakeOn mode, we would have already waited for
// the server preface, so we consider this a success and restart
// from the top of the addr list. In RequireHandshakeOff mode,
// we don't care to wait for the server preface before
// considering this a success, so we also restart from the top
// of the addr list.
}
if !healthcheckManagingState {
ac.mu.Lock()
ac.backoffIdx = 0
ac.updateConnectivityState(connectivity.Ready)
ac.mu.Unlock()
break addrLoop
}

<-reconnect.Done()
hcancel()

// In RequireHandshakeOn mode, we would have already waited for
// the server preface, so we consider this a success and restart
// from the top of the addr list. In RequireHandshakeOff mode,
// we don't care to wait for the server preface before
// considering this a success, so we also restart from the top
// of the addr list.
ac.mu.Lock()
ac.backoffIdx = 0
ac.mu.Unlock()
break addrLoop
}

// After exhausting all addresses, or after need to reconnect after a
Expand Down Expand Up @@ -1154,8 +1118,6 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
Authority: ac.cc.authority,
}

prefaceTimer := time.NewTimer(time.Until(connectDeadline))

onGoAway := func(r transport.GoAwayReason) {
ac.mu.Lock()
ac.adjustParams(r)
Expand All @@ -1165,13 +1127,11 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne

onClose := func() {
close(onCloseCalled)
prefaceTimer.Stop()
reconnect.Fire()
}

onPrefaceReceipt := func() {
close(prefaceReceived)
prefaceTimer.Stop()
}

connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
Expand All @@ -1181,38 +1141,8 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
}

newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)

if err == nil {
if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
select {
case <-prefaceTimer.C:
// We didn't get the preface in time.
newTr.Close()
err = errors.New("timed out waiting for server handshake")
case <-prefaceReceived:
// We got the preface - huzzah! things are good.
case <-onCloseCalled:
// The transport has already closed - noop.
return nil, errors.New("connection closed")
}
} else if ac.dopts.reqHandshake == envconfig.RequireHandshakeHybrid {
go func() {
select {
case <-prefaceTimer.C:
// We didn't get the preface in time.
newTr.Close()
case <-prefaceReceived:
// We got the preface just in the nick of time - huzzah!
case <-onCloseCalled:
// The transport has already closed - noop.
}
}()
}
}

if err != nil {
// newTr is either nil, or closed.
ac.cc.blockingpicker.updateConnectionError(err)
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
Expand All @@ -1225,6 +1155,22 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
return nil, err
}

if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
select {
case <-time.After(connectDeadline.Sub(time.Now())):
// We didn't get the preface in time.
newTr.Close()
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
return nil, errors.New("timed out waiting for server handshake")
case <-prefaceReceived:
// We got the preface - huzzah! things are good.
case <-onCloseCalled:
// The transport has already closed - noop.
return nil, errors.New("connection closed")
// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
}
}

// Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
Expand Down
Loading

0 comments on commit 5878d96

Please sign in to comment.