Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport: remove RequireHandshakeHybrid support #2529

Merged
merged 1 commit into from
Feb 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 52 additions & 106 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,35 +1019,8 @@ func (ac *addrConn) resetTransport() {
reconnect := grpcsync.NewEvent()
prefaceReceived := make(chan struct{})
newTr, err := ac.createTransport(addr, copts, connectDeadline, reconnect, prefaceReceived)
if err == nil {
ac.mu.Lock()
ac.curAddr = addr
ac.transport = newTr
ac.mu.Unlock()

healthCheckConfig := ac.cc.healthCheckConfig()
// LB channel health checking is only enabled when all the four requirements below are met:
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
// 3. a service config with non-empty healthCheckConfig field is provided,
// 4. the current load balancer allows it.
healthcheckManagingState := false
if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
if ac.cc.dopts.healthCheckFunc == nil {
// TODO: add a link to the health check doc in the error message.
grpclog.Error("the client side LB channel health check function has not been set.")
} else {
// TODO(deklerk) refactor to just return transport
go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
healthcheckManagingState = true
}
}
if !healthcheckManagingState {
ac.mu.Lock()
ac.updateConnectivityState(connectivity.Ready)
ac.mu.Unlock()
}
} else {
if err != nil {
ac.cc.blockingpicker.updateConnectionError(err)
hcancel()
if err == errConnClosing {
return
Expand All @@ -1060,55 +1033,46 @@ func (ac *addrConn) resetTransport() {
}

ac.mu.Lock()
reqHandshake := ac.dopts.reqHandshake
ac.curAddr = addr
ac.transport = newTr
ac.mu.Unlock()

<-reconnect.Done()
hcancel()

if reqHandshake == envconfig.RequireHandshakeHybrid {
// In RequireHandshakeHybrid mode, we must check to see whether
// server preface has arrived yet to decide whether to start
// reconnecting at the top of the list (server preface received)
// or continue with the next addr in the list as if the
// connection were not successful (server preface not received).
select {
case <-prefaceReceived:
// We received a server preface - huzzah! We consider this
// a success and restart from the top of the addr list.
ac.mu.Lock()
ac.backoffIdx = 0
ac.mu.Unlock()
break addrLoop
default:
// Despite having set state to READY, in hybrid mode we
// consider this a failure and continue connecting at the
// next addr in the list.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}

ac.updateConnectivityState(connectivity.TransientFailure)
ac.mu.Unlock()

if tryNextAddrFromStart.HasFired() {
break addrLoop
}
healthCheckConfig := ac.cc.healthCheckConfig()
// LB channel health checking is only enabled when all the four requirements below are met:
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
// 3. a service config with non-empty healthCheckConfig field is provided,
// 4. the current load balancer allows it.
healthcheckManagingState := false
if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
if ac.cc.dopts.healthCheckFunc == nil {
// TODO: add a link to the health check doc in the error message.
grpclog.Error("the client side LB channel health check function has not been set.")
} else {
// TODO(deklerk) refactor to just return transport
go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
healthcheckManagingState = true
}
} else {
// In RequireHandshakeOn mode, we would have already waited for
// the server preface, so we consider this a success and restart
// from the top of the addr list. In RequireHandshakeOff mode,
// we don't care to wait for the server preface before
// considering this a success, so we also restart from the top
// of the addr list.
}
if !healthcheckManagingState {
ac.mu.Lock()
ac.backoffIdx = 0
ac.updateConnectivityState(connectivity.Ready)
ac.mu.Unlock()
break addrLoop
}

<-reconnect.Done()
hcancel()

// In RequireHandshakeOn mode, we would have already waited for
// the server preface, so we consider this a success and restart
// from the top of the addr list. In RequireHandshakeOff mode,
// we don't care to wait for the server preface before
// considering this a success, so we also restart from the top
// of the addr list.
ac.mu.Lock()
ac.backoffIdx = 0
ac.mu.Unlock()
break addrLoop
}

// After exhausting all addresses, or after need to reconnect after a
Expand Down Expand Up @@ -1154,8 +1118,6 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
Authority: ac.cc.authority,
}

prefaceTimer := time.NewTimer(time.Until(connectDeadline))

onGoAway := func(r transport.GoAwayReason) {
ac.mu.Lock()
ac.adjustParams(r)
Expand All @@ -1165,13 +1127,11 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne

onClose := func() {
close(onCloseCalled)
prefaceTimer.Stop()
reconnect.Fire()
}

onPrefaceReceipt := func() {
close(prefaceReceived)
prefaceTimer.Stop()
}

connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
Expand All @@ -1181,38 +1141,8 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
}

newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)

if err == nil {
if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
select {
case <-prefaceTimer.C:
// We didn't get the preface in time.
newTr.Close()
err = errors.New("timed out waiting for server handshake")
case <-prefaceReceived:
// We got the preface - huzzah! things are good.
case <-onCloseCalled:
// The transport has already closed - noop.
return nil, errors.New("connection closed")
}
} else if ac.dopts.reqHandshake == envconfig.RequireHandshakeHybrid {
go func() {
select {
case <-prefaceTimer.C:
// We didn't get the preface in time.
newTr.Close()
case <-prefaceReceived:
// We got the preface just in the nick of time - huzzah!
case <-onCloseCalled:
// The transport has already closed - noop.
}
}()
}
}

if err != nil {
// newTr is either nil, or closed.
ac.cc.blockingpicker.updateConnectionError(err)
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
Expand All @@ -1225,6 +1155,22 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
return nil, err
}

if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
select {
case <-time.After(connectDeadline.Sub(time.Now())):
// We didn't get the preface in time.
newTr.Close()
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
return nil, errors.New("timed out waiting for server handshake")
case <-prefaceReceived:
// We got the preface - huzzah! things are good.
case <-onCloseCalled:
// The transport has already closed - noop.
return nil, errors.New("connection closed")
// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
}
}

// Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
Expand Down
Loading