Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GODRIVER-3156 Detect and discard closed idle connections. #1815

Merged
merged 5 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 51 additions & 10 deletions x/mongo/driver/topology/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"fmt"
"io"
"net"
"os"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -55,7 +56,7 @@ type connection struct {
nc net.Conn // When nil, the connection is closed.
addr address.Address
idleTimeout time.Duration
idleDeadline atomic.Value // Stores a time.Time
idleStart atomic.Value // Stores a time.Time
readTimeout time.Duration
writeTimeout time.Duration
desc description.Server
Expand Down Expand Up @@ -561,25 +562,65 @@ func (c *connection) close() error {
return err
}

// closed returns true if the connection has been closed by the driver.
func (c *connection) closed() bool {
return atomic.LoadInt64(&c.state) == connDisconnected
}

// isAlive returns true if the connection is alive and ready to be used for an
// operation.
//
// Note that the liveness check can be slow (at least 1ms), so isAlive only
// checks the liveness of the connection if it's been idle for at least 10
// seconds. For frequently in-use connections, a network error during an
// operation will be the first indication of a dead connection.
func (c *connection) isAlive() bool {
if c.nc == nil {
return false
}

// If the connection has been idle for less than 10 seconds, skip the
// liveness check.
//
// The 10-seconds idle bypass is based on the liveness check implementation
// in the Python Driver. That implementation uses 1 second as the idle
// threshold, but we chose to be more conservative in the Go Driver because
// this is new behavior with unknown side-effects. See
// https://github.com/mongodb/mongo-python-driver/blob/e6b95f65953e01e435004af069a6976473eaf841/pymongo/synchronous/pool.py#L983-L985
idleStart, ok := c.idleStart.Load().(time.Time)
if !ok || idleStart.Add(10*time.Second).After(time.Now()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the significance of 10 seconds?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reference was the PyMongo driver, which checks connection liveness when it's idle for >1 second. I wanted to mitigate the risk that checking liveness too often would cause performance problems (since a liveness check takes at least 1ms), so I increased that threshold to 10s.

return true
}

// Set a 1ms read deadline and attempt to read 1 byte from the connection.
// Expect it to block for 1ms then return a deadline exceeded error. If it
// returns any other error, the connection is not usable, so return false.
// If it doesn't return an error and actually reads data, the connection is
// also not usable, so return false.
//
// Note that we don't need to un-set the read deadline because the "read"
// and "write" methods always reset the deadlines.
err := c.nc.SetReadDeadline(time.Now().Add(1 * time.Millisecond))
if err != nil {
return false
}
var b [1]byte
_, err = c.nc.Read(b[:])
return errors.Is(err, os.ErrDeadlineExceeded)
}

func (c *connection) idleTimeoutExpired() bool {
now := time.Now()
if c.idleTimeout > 0 {
idleDeadline, ok := c.idleDeadline.Load().(time.Time)
if ok && now.After(idleDeadline) {
return true
}
if c.idleTimeout == 0 {
return false
}

return false
idleStart, ok := c.idleStart.Load().(time.Time)
return ok && idleStart.Add(c.idleTimeout).Before(time.Now())
}

func (c *connection) bumpIdleDeadline() {
func (c *connection) bumpIdleStart() {
if c.idleTimeout > 0 {
c.idleDeadline.Store(time.Now().Add(c.idleTimeout))
c.idleStart.Store(time.Now())
}
}

Expand Down
87 changes: 85 additions & 2 deletions x/mongo/driver/topology/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/google/go-cmp/cmp"
"go.mongodb.org/mongo-driver/internal/assert"
"go.mongodb.org/mongo-driver/internal/require"
"go.mongodb.org/mongo-driver/mongo/address"
"go.mongodb.org/mongo-driver/mongo/description"
"go.mongodb.org/mongo-driver/x/mongo/driver"
Expand Down Expand Up @@ -427,7 +428,7 @@ func TestConnection(t *testing.T) {

want := []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A}
err := conn.writeWireMessage(context.Background(), want)
noerr(t, err)
require.NoError(t, err)
got := tnc.buf
if !cmp.Equal(got, want) {
t.Errorf("writeWireMessage did not write the proper bytes. got %v; want %v", got, want)
Expand Down Expand Up @@ -624,7 +625,7 @@ func TestConnection(t *testing.T) {
conn.cancellationListener = listener

got, err := conn.readWireMessage(context.Background())
noerr(t, err)
require.NoError(t, err)
if !cmp.Equal(got, want) {
t.Errorf("did not read full wire message. got %v; want %v", got, want)
}
Expand Down Expand Up @@ -1251,3 +1252,85 @@ func (tcl *testCancellationListener) assertCalledOnce(t *testing.T) {
assert.Equal(t, 1, tcl.numListen, "expected Listen to be called once, got %d", tcl.numListen)
assert.Equal(t, 1, tcl.numStopListening, "expected StopListening to be called once, got %d", tcl.numListen)
}

func TestConnection_IsAlive(t *testing.T) {
t.Parallel()

t.Run("uninitialized", func(t *testing.T) {
t.Parallel()

conn := newConnection("")
assert.False(t,
conn.isAlive(),
"expected isAlive for an uninitialized connection to always return false")
})

t.Run("connection open", func(t *testing.T) {
t.Parallel()

cleanup := make(chan struct{})
defer close(cleanup)
addr := bootstrapConnections(t, 1, func(nc net.Conn) {
// Keep the connection open until the end of the test.
<-cleanup
_ = nc.Close()
})

conn := newConnection(address.Address(addr.String()))
err := conn.connect(context.Background())
require.NoError(t, err)

conn.idleStart.Store(time.Now().Add(-11 * time.Second))
assert.True(t,
conn.isAlive(),
"expected isAlive for an open connection to return true")
})

t.Run("connection closed", func(t *testing.T) {
t.Parallel()

conns := make(chan net.Conn)
addr := bootstrapConnections(t, 1, func(nc net.Conn) {
conns <- nc
})

conn := newConnection(address.Address(addr.String()))
err := conn.connect(context.Background())
require.NoError(t, err)

// Close the connection before calling isAlive.
nc := <-conns
err = nc.Close()
require.NoError(t, err)

conn.idleStart.Store(time.Now().Add(-11 * time.Second))
assert.False(t,
conn.isAlive(),
"expected isAlive for a closed connection to return false")
})

t.Run("connection reads data", func(t *testing.T) {
t.Parallel()

cleanup := make(chan struct{})
defer close(cleanup)
addr := bootstrapConnections(t, 1, func(nc net.Conn) {
// Write some data to the connection before calling isAlive.
_, err := nc.Write([]byte{5, 0, 0, 0, 0})
require.NoError(t, err)

// Keep the connection open until the end of the test.
<-cleanup
_ = nc.Close()
})

conn := newConnection(address.Address(addr.String()))
err := conn.connect(context.Background())
require.NoError(t, err)

conn.idleStart.Store(time.Now().Add(-11 * time.Second))
assert.False(t,
conn.isAlive(),
"expected isAlive for an open connection that reads data to return false")
})
}
23 changes: 14 additions & 9 deletions x/mongo/driver/topology/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,11 @@ type reason struct {
// connectionPerished checks if a given connection is perished and should be removed from the pool.
func connectionPerished(conn *connection) (reason, bool) {
switch {
case conn.closed():
// A connection would only be closed if it encountered a network error during an operation and closed itself.
case conn.closed() || !conn.isAlive():
// A connection would only be closed if it encountered a network error
// during an operation and closed itself. If a connection is not alive
// (e.g. the connection was closed by the server-side), it's also
// considered a network error.
return reason{
loggerConn: logger.ReasonConnClosedError,
event: event.ReasonError,
Expand Down Expand Up @@ -898,13 +901,15 @@ func (p *pool) checkInNoEvent(conn *connection) error {
return nil
}

// Bump the connection idle deadline here because we're about to make the connection "available".
// The idle deadline is used to determine when a connection has reached its max idle time and
// should be closed. A connection reaches its max idle time when it has been "available" in the
// idle connections stack for more than the configured duration (maxIdleTimeMS). Set it before
// we call connectionPerished(), which checks the idle deadline, because a newly "available"
// connection should never be perished due to max idle time.
conn.bumpIdleDeadline()
// Bump the connection idle start time here because we're about to make the
// connection "available". The idle start time is used to determine how long
// a connection has been idle and when it has reached its max idle time and
// should be closed. A connection reaches its max idle time when it has been
// "available" in the idle connections stack for more than the configured
// duration (maxIdleTimeMS). Set it before we call connectionPerished(),
// which checks the idle deadline, because a newly "available" connection
// should never be perished due to max idle time.
conn.bumpIdleStart()

r, perished := connectionPerished(conn)
if !perished && conn.pool.getState() == poolClosed {
Expand Down
Loading
Loading