Skip to content

Commit

Permalink
Add tests for listener Close + invoke only pool release
Browse files Browse the repository at this point in the history
Here, follow up #246 by adding a few more tests that verify a listener's
state after `Close` has been invoked, including if it returned an error,
which we're able to simulate by overriding pgx's `DialFunc` and
returning a stand-in stub for an underlying `net.Conn`.

Also, remove the explicit `Close` call on an underlying connection in
favor of just invoking the pool's `Release` function. In case of an
error condition, `Release` will detect that and do the right thing, and
pgx is better tested/vetted to make sure that right thing happens.
  • Loading branch information
brandur committed Mar 2, 2024
1 parent 6b03de1 commit 84fa0e8
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 7 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ env:
# A suitable URL for non-test database.
DATABASE_URL: postgres://postgres:[email protected]:5432/river_dev?sslmode=disable

# Test database.
TEST_DATABASE_URL: postgres://postgres:[email protected]:5432/river_testdb?sslmode=disable

on:
push:
branches:
Expand Down Expand Up @@ -65,9 +68,8 @@ jobs:
PGSSLMODE: disable

- name: Test
working-directory: .
run: go test -p 1 -race ./...
env:
TEST_DATABASE_URL: postgres://postgres:[email protected]:5432/river_testdb?sslmode=disable

- name: Test cmd/river
working-directory: ./cmd/river
Expand Down
6 changes: 1 addition & 5 deletions riverdriver/riverpgxv5/river_pgx_v5_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,14 +475,10 @@ func (l *Listener) Close(ctx context.Context) error {
return nil
}

err := l.conn.Conn().Close(ctx)

// Regardless of the error state returned above, always release and unset
// the listener's local connection.
l.conn.Release()
l.conn = nil

return err
return nil
}

func (l *Listener) Connect(ctx context.Context) error {
Expand Down
140 changes: 140 additions & 0 deletions riverdriver/riverpgxv5/river_pgx_v5_driver_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package riverpgxv5

import (
"context"
"errors"
"fmt"
"net"
"os"
"testing"
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
Expand Down Expand Up @@ -34,10 +39,145 @@ func TestNew(t *testing.T) {
})
}

func TestListener_Close(t *testing.T) {
t.Parallel()

ctx := context.Background()

t.Run("NoOpWithoutConn", func(t *testing.T) {
t.Parallel()

listener := &Listener{dbPool: testPool(ctx, t, nil)}
require.Nil(t, listener.conn)
require.NoError(t, listener.Close(ctx))
})

t.Run("ReleasesAndUnsetsConn", func(t *testing.T) {
t.Parallel()

config := testPoolConfig()

releaseInvoked := make(chan struct{})

// pgx calls AfterRelease in a goroutine, which is why we communicate with a channel.
config.AfterRelease = func(c *pgx.Conn) bool {
close(releaseInvoked)
return true
}

listener := &Listener{dbPool: testPool(ctx, t, config)}

require.NoError(t, listener.Connect(ctx))
require.NotNil(t, listener.conn)

require.NoError(t, listener.Close(ctx))

// Connection has been released.
select {
case <-releaseInvoked:
case <-time.After(3 * time.Second):
require.FailNow(t, "Timed out waiting for connection to be released")
}

// Despite error, internal connection still unset.
require.Nil(t, listener.conn)
})

t.Run("UnsetsConnEvenOnError", func(t *testing.T) {
t.Parallel()

var connStub *connStub

config := testPoolConfig()
config.ConnConfig.DialFunc = func(ctx context.Context, network, addr string) (net.Conn, error) {
// Dialer settings come from pgx's default internal one (not exported unfortunately).
conn, err := (&net.Dialer{KeepAlive: 5 * time.Minute}).Dial(network, addr)
if err != nil {
return nil, err
}

connStub = newConnStub(conn)
return connStub, nil
}

listener := &Listener{dbPool: testPool(ctx, t, config)}

require.NoError(t, listener.Connect(ctx))
require.NotNil(t, listener.conn)

conn := listener.conn.Conn()

expectedErr := errors.New("conn close error")
connStub.closeFunc = func() error {
t.Logf("Close invoked; returning error")
return expectedErr
}

// Error isn't passed through because we invoke pgxpool.Release.
require.NoError(t, listener.Close(ctx))

// Despite error, internal connection still unset.
require.Nil(t, listener.conn)

// Make sure that our stubbing above worked by closing the connection we
// retained and verifying it's the error we set.
require.ErrorIs(t, conn.Close(ctx), expectedErr)
})
}

func TestInterpretError(t *testing.T) {
t.Parallel()

require.EqualError(t, interpretError(errors.New("an error")), "an error")
require.ErrorIs(t, interpretError(pgx.ErrNoRows), rivertype.ErrNotFound)
require.NoError(t, interpretError(nil))
}

// connStub implements net.Conn and allows us to stub particular functions like
// Close that are otherwise nigh impossible to test.
type connStub struct {
net.Conn

closeFunc func() error
}

func newConnStub(conn net.Conn) *connStub {
return &connStub{
Conn: conn,

closeFunc: conn.Close,
}
}

func (c *connStub) Close() error {
return c.closeFunc()
}

func testPoolConfig() *pgxpool.Config {
databaseURL := "postgres://localhost/river_testdb?sslmode=disable"
if url := os.Getenv("TEST_DATABASE_URL"); url != "" {
databaseURL = url
}

config, err := pgxpool.ParseConfig(databaseURL)
if err != nil {
panic(fmt.Sprintf("error parsing database URL: %v", err))
}
config.ConnConfig.ConnectTimeout = 10 * time.Second
config.ConnConfig.RuntimeParams["timezone"] = "UTC"

return config
}

func testPool(ctx context.Context, t *testing.T, config *pgxpool.Config) *pgxpool.Pool {
t.Helper()

if config == nil {
config = testPoolConfig()
}

dbPool, err := pgxpool.NewWithConfig(ctx, config)
require.NoError(t, err)
t.Cleanup(dbPool.Close)
return dbPool
}

0 comments on commit 84fa0e8

Please sign in to comment.