Skip to content

Commit

Permalink
deprecate MigrateTx, convert tests to use schemas
Browse files Browse the repository at this point in the history
As detailed in #600, there are certain combinations of schema changes
which are not allowed to be run within the same transaction. The example
we encountered with #590 is adding a new enum value, then using it in an
immutable function during a subsequent migration. In Postgres, these
must be separated by a commit.

There are other examples of things which cannot be run in a transaction,
such as `CREATE INDEX CONCURRENTLY`. While that specific one isn't
solved here, moving away from a migrator that bundles migrations into a
single transaction will also allow us to update our migration system to
exclude certain migrations from transactions and i.e. add indexes
concurrently.
  • Loading branch information
bgentry committed Sep 18, 2024
1 parent 01044a2 commit 2ceb8bb
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 141 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
}
```

- **Deprecated**: The `MigrateTx` method of `rivermigrate` has been deprecated. It turns out there are certain combinations of schema changes which cannot be run within a single transaction, and the migrator now prefers to run each migration in its own transaction, one-at-a-time. `MigrateTx` will be removed in future version.

- The migrator now produces a better error in case of a non-existent migration line including suggestions for known migration lines that are similar in name to the invalid one. [PR #558](https://github.com/riverqueue/river/pull/558).

## Fixed
Expand Down
23 changes: 12 additions & 11 deletions rivermigrate/example_migrate_database_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,27 @@ import (
func Example_migrateDatabaseSQL() {
ctx := context.Background()

dbPool, err := sql.Open("pgx", riverinternaltest.DatabaseURL("river_test_example"))
// Use a dedicated Postgres schema for this example so we can migrate and drop it at will:
schemaName := "migration_example_dbsql"
url := riverinternaltest.DatabaseURL("river_test_example") + "&search_path=" + schemaName
dbPool, err := sql.Open("pgx", url)
if err != nil {
panic(err)
}
defer dbPool.Close()

tx, err := dbPool.BeginTx(ctx, nil)
driver := riverdatabasesql.New(dbPool)
migrator, err := rivermigrate.New(driver, nil)
if err != nil {
panic(err)
}
defer tx.Rollback()

migrator, err := rivermigrate.New(riverdatabasesql.New(dbPool), nil)
if err != nil {
// Create the schema used for this example. Drop it when we're done.
// This isn't necessary outside this test.
if _, err := dbPool.ExecContext(ctx, "CREATE SCHEMA IF NOT EXISTS "+schemaName); err != nil {
panic(err)
}

// Our test database starts with a full River schema. Drop it so that we can
// demonstrate working migrations. This isn't necessary outside this test.
dropRiverSchema(ctx, migrator, tx)
defer dropRiverSchema(ctx, driver, schemaName)

printVersions := func(res *rivermigrate.MigrateResult) {
for _, version := range res.Versions {
Expand All @@ -47,7 +48,7 @@ func Example_migrateDatabaseSQL() {

// Migrate to version 3. An actual call may want to omit all MigrateOpts,
// which will default to applying all available up migrations.
res, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{
res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{
TargetVersion: 3,
})
if err != nil {
Expand All @@ -57,7 +58,7 @@ func Example_migrateDatabaseSQL() {

// Migrate down by three steps. Down migrating defaults to running only one
// step unless overridden by an option like MaxSteps or TargetVersion.
res, err = migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
res, err = migrator.Migrate(ctx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
MaxSteps: 3,
})
if err != nil {
Expand Down
32 changes: 17 additions & 15 deletions rivermigrate/example_migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/jackc/pgx/v5/pgxpool"

"github.com/riverqueue/river/internal/riverinternaltest"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivermigrate"
)
Expand All @@ -17,26 +18,29 @@ import (
func Example_migrate() {
ctx := context.Background()

dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_test_example"))
// Use a dedicated Postgres schema for this example so we can migrate and drop it at will:
schemaName := "migration_example"
poolConfig := riverinternaltest.DatabaseConfig("river_test_example")
poolConfig.ConnConfig.RuntimeParams["search_path"] = schemaName

dbPool, err := pgxpool.NewWithConfig(ctx, poolConfig)
if err != nil {
panic(err)
}
defer dbPool.Close()

tx, err := dbPool.Begin(ctx)
driver := riverpgxv5.New(dbPool)
migrator, err := rivermigrate.New(driver, nil)
if err != nil {
panic(err)
}
defer tx.Rollback(ctx)

migrator, err := rivermigrate.New(riverpgxv5.New(dbPool), nil)
if err != nil {
// Create the schema used for this example. Drop it when we're done.
// This isn't necessary outside this test.
if _, err := dbPool.Exec(ctx, "CREATE SCHEMA IF NOT EXISTS "+schemaName); err != nil {
panic(err)
}

// Our test database starts with a full River schema. Drop it so that we can
// demonstrate working migrations. This isn't necessary outside this test.
dropRiverSchema(ctx, migrator, tx)
defer dropRiverSchema(ctx, driver, schemaName)

printVersions := func(res *rivermigrate.MigrateResult) {
for _, version := range res.Versions {
Expand All @@ -46,7 +50,7 @@ func Example_migrate() {

// Migrate to version 3. An actual call may want to omit all MigrateOpts,
// which will default to applying all available up migrations.
res, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{
res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{
TargetVersion: 3,
})
if err != nil {
Expand All @@ -56,7 +60,7 @@ func Example_migrate() {

// Migrate down by three steps. Down migrating defaults to running only one
// step unless overridden by an option like MaxSteps or TargetVersion.
res, err = migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
res, err = migrator.Migrate(ctx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
MaxSteps: 3,
})
if err != nil {
Expand All @@ -73,10 +77,8 @@ func Example_migrate() {
// Migrated [DOWN] version 1
}

func dropRiverSchema[TTx any](ctx context.Context, migrator *rivermigrate.Migrator[TTx], tx TTx) {
_, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
TargetVersion: -1,
})
func dropRiverSchema[TTx any](ctx context.Context, driver riverdriver.Driver[TTx], schemaName string) {
_, err := driver.GetExecutor().Exec(ctx, "DROP SCHEMA IF EXISTS "+schemaName+" CASCADE;")
if err != nil {
panic(err)
}
Expand Down
3 changes: 3 additions & 0 deletions rivermigrate/river_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ func (m *Migrator[TTx]) Migrate(ctx context.Context, direction Direction, opts *
// This variant lets a caller run migrations within a transaction. Postgres DDL
// is transactional, so migration changes aren't visible until the transaction
// commits, and are rolled back if the transaction rolls back.
//
// Deprecated: Use Migrate instead. Certain migrations cannot be batched together
// in a single transaction, so this method is not recommended.
func (m *Migrator[TTx]) MigrateTx(ctx context.Context, tx TTx, direction Direction, opts *MigrateOpts) (*MigrateResult, error) {
switch direction {
case DirectionDown:
Expand Down
Loading

0 comments on commit 2ceb8bb

Please sign in to comment.