diff --git a/clientconn.go b/clientconn.go index edbed8762c16..fdc32df1de8d 100644 --- a/clientconn.go +++ b/clientconn.go @@ -1019,35 +1019,8 @@ func (ac *addrConn) resetTransport() { reconnect := grpcsync.NewEvent() prefaceReceived := make(chan struct{}) newTr, err := ac.createTransport(addr, copts, connectDeadline, reconnect, prefaceReceived) - if err == nil { - ac.mu.Lock() - ac.curAddr = addr - ac.transport = newTr - ac.mu.Unlock() - - healthCheckConfig := ac.cc.healthCheckConfig() - // LB channel health checking is only enabled when all the four requirements below are met: - // 1. it is not disabled by the user with the WithDisableHealthCheck DialOption, - // 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package, - // 3. a service config with non-empty healthCheckConfig field is provided, - // 4. the current load balancer allows it. - healthcheckManagingState := false - if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled { - if ac.cc.dopts.healthCheckFunc == nil { - // TODO: add a link to the health check doc in the error message. - grpclog.Error("the client side LB channel health check function has not been set.") - } else { - // TODO(deklerk) refactor to just return transport - go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName) - healthcheckManagingState = true - } - } - if !healthcheckManagingState { - ac.mu.Lock() - ac.updateConnectivityState(connectivity.Ready) - ac.mu.Unlock() - } - } else { + if err != nil { + ac.cc.blockingpicker.updateConnectionError(err) hcancel() if err == errConnClosing { return @@ -1060,55 +1033,46 @@ func (ac *addrConn) resetTransport() { } ac.mu.Lock() - reqHandshake := ac.dopts.reqHandshake + ac.curAddr = addr + ac.transport = newTr ac.mu.Unlock() - <-reconnect.Done() - hcancel() - - if reqHandshake == envconfig.RequireHandshakeHybrid { - // In RequireHandshakeHybrid mode, we must check to see whether - // server preface has arrived yet to decide whether to start - // reconnecting at the top of the list (server preface received) - // or continue with the next addr in the list as if the - // connection were not successful (server preface not received). - select { - case <-prefaceReceived: - // We received a server preface - huzzah! We consider this - // a success and restart from the top of the addr list. - ac.mu.Lock() - ac.backoffIdx = 0 - ac.mu.Unlock() - break addrLoop - default: - // Despite having set state to READY, in hybrid mode we - // consider this a failure and continue connecting at the - // next addr in the list. - ac.mu.Lock() - if ac.state == connectivity.Shutdown { - ac.mu.Unlock() - return - } - - ac.updateConnectivityState(connectivity.TransientFailure) - ac.mu.Unlock() - - if tryNextAddrFromStart.HasFired() { - break addrLoop - } + healthCheckConfig := ac.cc.healthCheckConfig() + // LB channel health checking is only enabled when all the four requirements below are met: + // 1. it is not disabled by the user with the WithDisableHealthCheck DialOption, + // 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package, + // 3. a service config with non-empty healthCheckConfig field is provided, + // 4. the current load balancer allows it. + healthcheckManagingState := false + if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled { + if ac.cc.dopts.healthCheckFunc == nil { + // TODO: add a link to the health check doc in the error message. + grpclog.Error("the client side LB channel health check function has not been set.") + } else { + // TODO(deklerk) refactor to just return transport + go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName) + healthcheckManagingState = true } - } else { - // In RequireHandshakeOn mode, we would have already waited for - // the server preface, so we consider this a success and restart - // from the top of the addr list. In RequireHandshakeOff mode, - // we don't care to wait for the server preface before - // considering this a success, so we also restart from the top - // of the addr list. + } + if !healthcheckManagingState { ac.mu.Lock() - ac.backoffIdx = 0 + ac.updateConnectivityState(connectivity.Ready) ac.mu.Unlock() - break addrLoop } + + <-reconnect.Done() + hcancel() + + // In RequireHandshakeOn mode, we would have already waited for + // the server preface, so we consider this a success and restart + // from the top of the addr list. In RequireHandshakeOff mode, + // we don't care to wait for the server preface before + // considering this a success, so we also restart from the top + // of the addr list. + ac.mu.Lock() + ac.backoffIdx = 0 + ac.mu.Unlock() + break addrLoop } // After exhausting all addresses, or after need to reconnect after a @@ -1154,8 +1118,6 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne Authority: ac.cc.authority, } - prefaceTimer := time.NewTimer(time.Until(connectDeadline)) - onGoAway := func(r transport.GoAwayReason) { ac.mu.Lock() ac.adjustParams(r) @@ -1165,13 +1127,11 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne onClose := func() { close(onCloseCalled) - prefaceTimer.Stop() reconnect.Fire() } onPrefaceReceipt := func() { close(prefaceReceived) - prefaceTimer.Stop() } connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline) @@ -1181,38 +1141,8 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne } newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose) - - if err == nil { - if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn { - select { - case <-prefaceTimer.C: - // We didn't get the preface in time. - newTr.Close() - err = errors.New("timed out waiting for server handshake") - case <-prefaceReceived: - // We got the preface - huzzah! things are good. - case <-onCloseCalled: - // The transport has already closed - noop. - return nil, errors.New("connection closed") - } - } else if ac.dopts.reqHandshake == envconfig.RequireHandshakeHybrid { - go func() { - select { - case <-prefaceTimer.C: - // We didn't get the preface in time. - newTr.Close() - case <-prefaceReceived: - // We got the preface just in the nick of time - huzzah! - case <-onCloseCalled: - // The transport has already closed - noop. - } - }() - } - } - if err != nil { // newTr is either nil, or closed. - ac.cc.blockingpicker.updateConnectionError(err) ac.mu.Lock() if ac.state == connectivity.Shutdown { // ac.tearDown(...) has been invoked. @@ -1225,6 +1155,22 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne return nil, err } + if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn { + select { + case <-time.After(connectDeadline.Sub(time.Now())): + // We didn't get the preface in time. + newTr.Close() + grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr) + return nil, errors.New("timed out waiting for server handshake") + case <-prefaceReceived: + // We got the preface - huzzah! things are good. + case <-onCloseCalled: + // The transport has already closed - noop. + return nil, errors.New("connection closed") + // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix. + } + } + // Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport. ac.mu.Lock() if ac.state == connectivity.Shutdown { diff --git a/clientconn_test.go b/clientconn_test.go index 76a4338133f9..b619344ebecc 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -121,15 +121,6 @@ func (s) TestDialWithMultipleBackendsNotSendingServerPreface(t *testing.T) { var allReqHSSettings = []envconfig.RequireHandshakeSetting{ envconfig.RequireHandshakeOff, envconfig.RequireHandshakeOn, - envconfig.RequireHandshakeHybrid, -} -var reqNoHSSettings = []envconfig.RequireHandshakeSetting{ - envconfig.RequireHandshakeOff, - envconfig.RequireHandshakeHybrid, -} -var reqHSBeforeSuccess = []envconfig.RequireHandshakeSetting{ - envconfig.RequireHandshakeOn, - envconfig.RequireHandshakeHybrid, } func (s) TestDialWaitsForServerSettings(t *testing.T) { @@ -332,50 +323,46 @@ func (s) TestDialDoesNotWaitForServerSettings(t *testing.T) { // Restore current setting after test. old := envconfig.RequireHandshake defer func() { envconfig.RequireHandshake = old }() + envconfig.RequireHandshake = envconfig.RequireHandshakeOff - // Test with "off" and "hybrid". - for _, setting := range reqNoHSSettings { - envconfig.RequireHandshake = setting - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("Error while listening. Err: %v", err) - } - defer lis.Close() - done := make(chan struct{}) - dialDone := make(chan struct{}) - go func() { // Launch the server. - defer func() { - close(done) - }() - conn, err := lis.Accept() - if err != nil { - t.Errorf("Error while accepting. Err: %v", err) - return - } - defer conn.Close() - <-dialDone // Close conn only after dial returns. + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Error while listening. Err: %v", err) + } + defer lis.Close() + done := make(chan struct{}) + dialDone := make(chan struct{}) + go func() { // Launch the server. + defer func() { + close(done) }() - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithBlock()) - + conn, err := lis.Accept() if err != nil { - t.Fatalf("DialContext returned err =%v; want nil", err) + t.Errorf("Error while accepting. Err: %v", err) + return } - defer client.Close() + defer conn.Close() + <-dialDone // Close conn only after dial returns. + }() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithBlock()) + if err != nil { + t.Fatalf("DialContext returned err =%v; want nil", err) + } + defer client.Close() - if state := client.GetState(); state != connectivity.Ready { - t.Fatalf("client.GetState() = %v; want connectivity.Ready", state) - } - close(dialDone) - <-done + if state := client.GetState(); state != connectivity.Ready { + t.Fatalf("client.GetState() = %v; want connectivity.Ready", state) } + close(dialDone) } func (s) TestCloseConnectionWhenServerPrefaceNotReceived(t *testing.T) { // Restore current setting after test. old := envconfig.RequireHandshake defer func() { envconfig.RequireHandshake = old }() + envconfig.RequireHandshake = envconfig.RequireHandshakeOn // 1. Client connects to a server that doesn't send preface. // 2. After minConnectTimeout(500 ms here), client disconnects and retries. @@ -384,80 +371,75 @@ func (s) TestCloseConnectionWhenServerPrefaceNotReceived(t *testing.T) { cleanup := setMinConnectTimeout(time.Millisecond * 500) defer cleanup() - // Test with "on" and "hybrid". - for _, setting := range reqHSBeforeSuccess { - envconfig.RequireHandshake = setting - - lis, err := net.Listen("tcp", "localhost:0") + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Error while listening. Err: %v", err) + } + var ( + conn2 net.Conn + over uint32 + ) + defer func() { + lis.Close() + // conn2 shouldn't be closed until the client has + // observed a successful test. + if conn2 != nil { + conn2.Close() + } + }() + done := make(chan struct{}) + accepted := make(chan struct{}) + go func() { // Launch the server. + defer close(done) + conn1, err := lis.Accept() if err != nil { - t.Fatalf("Error while listening. Err: %v", err) + t.Errorf("Error while accepting. Err: %v", err) + return } - var ( - conn2 net.Conn - over uint32 - ) - defer func() { - lis.Close() - // conn2 shouldn't be closed until the client has - // observed a successful test. - if conn2 != nil { - conn2.Close() - } - }() - done := make(chan struct{}) - accepted := make(chan struct{}) - go func() { // Launch the server. - defer close(done) - conn1, err := lis.Accept() - if err != nil { - t.Errorf("Error while accepting. Err: %v", err) - return - } - defer conn1.Close() - // Don't send server settings and the client should close the connection and try again. - conn2, err = lis.Accept() // Accept a reconnection request from client. - if err != nil { - t.Errorf("Error while accepting. Err: %v", err) - return + defer conn1.Close() + // Don't send server settings and the client should close the connection and try again. + conn2, err = lis.Accept() // Accept a reconnection request from client. + if err != nil { + t.Errorf("Error while accepting. Err: %v", err) + return + } + close(accepted) + framer := http2.NewFramer(conn2, conn2) + if err = framer.WriteSettings(http2.Setting{}); err != nil { + t.Errorf("Error while writing settings. Err: %v", err) + return + } + b := make([]byte, 8) + for { + _, err = conn2.Read(b) + if err == nil { + continue } - close(accepted) - framer := http2.NewFramer(conn2, conn2) - if err = framer.WriteSettings(http2.Setting{}); err != nil { - t.Errorf("Error while writing settings. Err: %v", err) + if atomic.LoadUint32(&over) == 1 { + // The connection stayed alive for the timer. + // Success. return } - b := make([]byte, 8) - for { - _, err = conn2.Read(b) - if err == nil { - continue - } - if atomic.LoadUint32(&over) == 1 { - // The connection stayed alive for the timer. - // Success. - return - } - t.Errorf("Unexpected error while reading. Err: %v, want timeout error", err) - break - } - }() - client, err := Dial(lis.Addr().String(), WithInsecure()) - if err != nil { - t.Fatalf("Error while dialing. Err: %v", err) + t.Errorf("Unexpected error while reading. Err: %v, want timeout error", err) + break } - // wait for connection to be accepted on the server. - timer := time.NewTimer(time.Second * 10) - select { - case <-accepted: - case <-timer.C: - t.Fatalf("Client didn't make another connection request in time.") - } - // Make sure the connection stays alive for sometime. - time.Sleep(time.Second) - atomic.StoreUint32(&over, 1) - client.Close() - <-done + }() + client, err := Dial(lis.Addr().String(), WithInsecure()) + if err != nil { + t.Fatalf("Error while dialing. Err: %v", err) } + // wait for connection to be accepted on the server. + timer := time.NewTimer(time.Second * 10) + select { + case <-accepted: + case <-timer.C: + t.Fatalf("Client didn't make another connection request in time.") + } + // Make sure the connection stays alive for sometime. + time.Sleep(time.Second) + atomic.StoreUint32(&over, 1) + client.Close() + <-done } func (s) TestBackoffWhenNoServerPrefaceReceived(t *testing.T) { diff --git a/internal/envconfig/envconfig.go b/internal/envconfig/envconfig.go index 62ed0f2f1200..11be7cd08c50 100644 --- a/internal/envconfig/envconfig.go +++ b/internal/envconfig/envconfig.go @@ -34,13 +34,9 @@ const ( type RequireHandshakeSetting int const ( - // RequireHandshakeHybrid (default, deprecated) indicates to not wait for - // handshake before considering a connection ready, but wait before - // considering successful. - RequireHandshakeHybrid RequireHandshakeSetting = iota - // RequireHandshakeOn (default after the 1.17 release) indicates to wait - // for handshake before considering a connection ready/successful. - RequireHandshakeOn + // RequireHandshakeOn indicates to wait for handshake before considering a + // connection ready/successful. + RequireHandshakeOn RequireHandshakeSetting = iota // RequireHandshakeOff indicates to not wait for handshake before // considering a connection ready/successful. RequireHandshakeOff @@ -53,7 +49,7 @@ var ( // environment variable. // // Will be removed after the 1.18 release. - RequireHandshake RequireHandshakeSetting + RequireHandshake = RequireHandshakeOn ) func init() { @@ -64,8 +60,5 @@ func init() { RequireHandshake = RequireHandshakeOn case "off": RequireHandshake = RequireHandshakeOff - case "hybrid": - // Will be removed after the 1.17 release. - RequireHandshake = RequireHandshakeHybrid } } diff --git a/test/gracefulstop_test.go b/test/gracefulstop_test.go index 96b1578a8628..c4128892d022 100644 --- a/test/gracefulstop_test.go +++ b/test/gracefulstop_test.go @@ -27,8 +27,8 @@ import ( "testing" "time" + "golang.org/x/net/http2" "google.golang.org/grpc" - "google.golang.org/grpc/internal/envconfig" testpb "google.golang.org/grpc/test/grpc_testing" ) @@ -52,6 +52,13 @@ func (d *delayListener) Accept() (net.Conn, error) { default: close(d.acceptCalled) conn, err := d.Listener.Accept() + if err != nil { + return nil, err + } + framer := http2.NewFramer(conn, conn) + if err = framer.WriteSettings(http2.Setting{}); err != nil { + return nil, err + } // Allow closing of listener only after accept. // Note: Dial can return successfully, yet Accept // might now have finished. @@ -107,10 +114,14 @@ func (d *delayConn) Read(b []byte) (n int, err error) { } func (s) TestGracefulStop(t *testing.T) { - // Set default behavior and restore current setting after test. + // We need to turn off RequireHandshake because if it were on, it would + // block forever waiting to read the handshake, and the delayConn would + // never let it (the delay is intended to block until later in the test). + // + // Restore current setting after test. old := envconfig.RequireHandshake - envconfig.RequireHandshake = envconfig.RequireHandshakeOff defer func() { envconfig.RequireHandshake = old }() + envconfig.RequireHandshake = envconfig.RequireHandshakeOff // This test ensures GracefulStop cannot race and break RPCs on new // connections created after GracefulStop was called but before diff --git a/vet.sh b/vet.sh index 7209aa5b0a44..89c3f72b98e9 100755 --- a/vet.sh +++ b/vet.sh @@ -119,5 +119,6 @@ google.golang.org/grpc/stats/stats_test.go:SA1019 google.golang.org/grpc/test/channelz_test.go:SA1019 google.golang.org/grpc/test/end2end_test.go:SA1019 google.golang.org/grpc/test/healthcheck_test.go:SA1019 +google.golang.org/grpc/clientconn.go:S1024 ' ./... misspell -error .