diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index e15b8ea2..47604e66 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -124,6 +124,10 @@ jobs: - name: Create database run: psql --echo-errors --quiet -c '\timing off' -c "CREATE DATABASE river_dev;" ${ADMIN_DATABASE_URL} + - name: river migrate-get + run: ./river migrate-get --up --version 3 + working-directory: ./cmd/river + - name: river migrate-up run: ./river migrate-up --database-url $DATABASE_URL working-directory: ./cmd/river diff --git a/CHANGELOG.md b/CHANGELOG.md index 591c7033..fd0f78d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - The River CLI now supports `river bench` to benchmark River's job throughput against a database. [PR #254](https://github.com/riverqueue/river/pull/254). +- The River CLI now has a `river migrate-get` command to dump SQL for River migrations for use in alternative migration frameworks. Use it like `river migrate-get --up --version 3 > version3.up.sql`. [PR #273](https://github.com/riverqueue/river/pull/273). +- The River CLI's `migrate-down` and `migrate-up` options get two new options for `--dry-run` and `--show-sql`. They can be combined to easily run a preflight check on a River upgrade to see which migration commands would be run on a database, but without actually running them. [PR #273](https://github.com/riverqueue/river/pull/273). - The River client gets a new `Client.SubscribeConfig` function that lets a subscriber specify the maximum size of their subscription channel. [PR #258](https://github.com/riverqueue/river/pull/258). ### Changed diff --git a/cmd/river/main.go b/cmd/river/main.go index 57cb0f20..483ced9a 100644 --- a/cmd/river/main.go +++ b/cmd/river/main.go @@ -4,9 +4,11 @@ import ( "context" "errors" "fmt" + "io" "log/slog" "os" "strconv" + "strings" "time" "github.com/jackc/pgx/v5/pgxpool" @@ -19,30 +21,49 @@ import ( ) func main() { + var rootOpts struct { + Debug bool + Verbose bool + } + rootCmd := &cobra.Command{ Use: "river", Short: "Provides command line facilities for the River job queue", - Long: ` + Long: strings.TrimSpace(` Provides command line facilities for the River job queue. - `, + `), Run: func(cmd *cobra.Command, args []string) { _ = cmd.Usage() }, } + rootCmd.PersistentFlags().BoolVar(&rootOpts.Debug, "debug", false, "output maximum logging verbosity (debug level)") + rootCmd.PersistentFlags().BoolVarP(&rootOpts.Verbose, "verbose", "v", false, "output additional logging verbosity (info level)") + rootCmd.MarkFlagsMutuallyExclusive("debug", "verbose") ctx := context.Background() execHandlingError := func(f func() (bool, error)) { ok, err := f() if err != nil { - fmt.Printf("failed: %s\n", err) + fmt.Fprintf(os.Stderr, "failed: %s\n", err) } if err != nil || !ok { os.Exit(1) } } - mustMarkFlagRequired := func(cmd *cobra.Command, name string) { //nolint:unparam + makeLogger := func() *slog.Logger { + switch { + case rootOpts.Debug: + return slog.New(tint.NewHandler(os.Stdout, &tint.Options{Level: slog.LevelDebug})) + case rootOpts.Verbose: + return slog.New(tint.NewHandler(os.Stdout, nil)) + default: + return slog.New(tint.NewHandler(os.Stdout, &tint.Options{Level: slog.LevelWarn})) + } + } + + mustMarkFlagRequired := func(cmd *cobra.Command, name string) { // We just panic here because this will never happen outside of an error // in development. if err := cmd.MarkFlagRequired(name); err != nil { @@ -57,7 +78,7 @@ Provides command line facilities for the River job queue. cmd := &cobra.Command{ Use: "bench", Short: "Run River benchmark", - Long: ` + Long: strings.TrimSpace(` Run a River benchmark which inserts and works jobs continually, giving a rough idea of jobs per second and time to work a single job. @@ -69,66 +90,104 @@ before starting the client, and works until all jobs are finished. The database in --database-url will have its jobs table truncated, so make sure to use a development database only. - `, + `), Run: func(cmd *cobra.Command, args []string) { - execHandlingError(func() (bool, error) { return bench(ctx, &opts) }) + execHandlingError(func() (bool, error) { return bench(ctx, makeLogger(), os.Stdout, &opts) }) }, } cmd.Flags().StringVar(&opts.DatabaseURL, "database-url", "", "URL of the database to benchmark (should look like `postgres://...`") - cmd.Flags().BoolVar(&opts.Debug, "debug", false, "output maximum logging verbosity (debug level)") cmd.Flags().DurationVar(&opts.Duration, "duration", 0, "duration after which to stop benchmark, accepting Go-style durations like 1m, 5m30s") cmd.Flags().IntVarP(&opts.NumTotalJobs, "num-total-jobs", "n", 0, "number of jobs to insert before starting and which are worked down until finish") - cmd.Flags().BoolVarP(&opts.Verbose, "verbose", "v", false, "output additional logging verbosity (info level)") mustMarkFlagRequired(cmd, "database-url") - cmd.MarkFlagsMutuallyExclusive("debug", "verbose") rootCmd.AddCommand(cmd) } + // migrate-down and migrate-up share a set of options, so this is a way of + // plugging in all the right flags to both so options and docstrings stay + // consistent. + addMigrateFlags := func(cmd *cobra.Command, opts *migrateOpts) { + cmd.Flags().StringVar(&opts.DatabaseURL, "database-url", "", "URL of the database to migrate (should look like `postgres://...`") + cmd.Flags().BoolVar(&opts.DryRun, "dry-run", false, "print information on migrations, but don't apply them") + cmd.Flags().IntVar(&opts.MaxSteps, "max-steps", 0, "maximum number of steps to migrate") + cmd.Flags().BoolVar(&opts.ShowSQL, "show-sql", false, "show SQL of each migration") + cmd.Flags().IntVar(&opts.TargetVersion, "target-version", 0, "target version to migrate to (final state includes this version, but none after it)") + mustMarkFlagRequired(cmd, "database-url") + } + // migrate-down { - var opts migrateDownOpts + var opts migrateOpts cmd := &cobra.Command{ Use: "migrate-down", Short: "Run River schema down migrations", - Long: ` + Long: strings.TrimSpace(` Run down migrations to reverse the River database schema changes. Defaults to running a single down migration. This behavior can be changed with --max-steps or --target-version. - `, + +SQL being run can be output using --show-sql, and executing real database +operations can be prevented with --dry-run. Combine --show-sql and --dry-run to +dump prospective migrations that would be applied to stdout. + `), Run: func(cmd *cobra.Command, args []string) { - execHandlingError(func() (bool, error) { return migrateDown(ctx, &opts) }) + execHandlingError(func() (bool, error) { return migrateDown(ctx, makeLogger(), os.Stdout, &opts) }) }, } - cmd.Flags().StringVar(&opts.DatabaseURL, "database-url", "", "URL of the database to migrate (should look like `postgres://...`") - cmd.Flags().IntVar(&opts.MaxSteps, "max-steps", 1, "maximum number of steps to migrate") - cmd.Flags().IntVar(&opts.TargetVersion, "target-version", 0, "target version to migrate to (final state includes this version, but none after it)") - mustMarkFlagRequired(cmd, "database-url") + addMigrateFlags(cmd, &opts) + rootCmd.AddCommand(cmd) + } + + // migrate-get + { + var opts migrateGetOpts + + cmd := &cobra.Command{ + Use: "migrate-get", + Short: "Get SQL for specific River migration", + Long: strings.TrimSpace(` +Retrieve SQL for a single migration version. This command is aimed at cases +where using River's internal migration framework isn't desirable by allowing +migration SQL to be dumped for use elsewhere. + +Specify a single version with --version, and one of --down or --up. + `), + Run: func(cmd *cobra.Command, args []string) { + execHandlingError(func() (bool, error) { return migrateGet(ctx, makeLogger(), os.Stdout, &opts) }) + }, + } + cmd.Flags().BoolVar(&opts.Down, "down", false, "print down migration") + cmd.Flags().BoolVar(&opts.Up, "up", false, "print up migration") + cmd.Flags().IntVar(&opts.Version, "version", 0, "version to print") + cmd.MarkFlagsMutuallyExclusive("down", "up") + cmd.MarkFlagsOneRequired("down", "up") + mustMarkFlagRequired(cmd, "version") rootCmd.AddCommand(cmd) } // migrate-up { - var opts migrateUpOpts + var opts migrateOpts cmd := &cobra.Command{ Use: "migrate-up", Short: "Run River schema up migrations", - Long: ` + Long: strings.TrimSpace(` Run up migrations to raise the database schema necessary to run River. Defaults to running all up migrations that aren't yet run. This behavior can be restricted with --max-steps or --target-version. - `, + +SQL being run can be output using --show-sql, and executing real database +operations can be prevented with --dry-run. Combine --show-sql and --dry-run to +dump prospective migrations that would be applied to stdout. + `), Run: func(cmd *cobra.Command, args []string) { - execHandlingError(func() (bool, error) { return migrateUp(ctx, &opts) }) + execHandlingError(func() (bool, error) { return migrateUp(ctx, makeLogger(), os.Stdout, &opts) }) }, } - cmd.Flags().StringVar(&opts.DatabaseURL, "database-url", "", "URL of the database to migrate (should look like `postgres://...`") - cmd.Flags().IntVar(&opts.MaxSteps, "max-steps", 0, "maximum number of steps to migrate") - cmd.Flags().IntVar(&opts.TargetVersion, "target-version", 0, "target version to migrate to (final state includes this version)") - mustMarkFlagRequired(cmd, "database-url") + addMigrateFlags(cmd, &opts) rootCmd.AddCommand(cmd) } @@ -139,12 +198,12 @@ restricted with --max-steps or --target-version. cmd := &cobra.Command{ Use: "validate", Short: "Validate River schema", - Long: ` + Long: strings.TrimSpace(` Validates the current River schema, exiting with a non-zero status in case there are outstanding migrations that still need to be run. - `, + `), Run: func(cmd *cobra.Command, args []string) { - execHandlingError(func() (bool, error) { return validate(ctx, &opts) }) + execHandlingError(func() (bool, error) { return validate(ctx, makeLogger(), os.Stdout, &opts) }) }, } cmd.Flags().StringVar(&opts.DatabaseURL, "database-url", "", "URL of the database to validate (should look like `postgres://...`") @@ -152,7 +211,10 @@ are outstanding migrations that still need to be run. rootCmd.AddCommand(cmd) } - execHandlingError(func() (bool, error) { return true, rootCmd.Execute() }) + // Cobra will already print an error on an uknown command, and there aren't + // really any other important top-level error cases to worry about as far as + // I can tell, so ignore a returned error here so we don't double print it. + _ = rootCmd.Execute() } func openDBPool(ctx context.Context, databaseURL string) (*pgxpool.Pool, error) { @@ -204,7 +266,7 @@ func (o *benchOpts) validate() error { return nil } -func bench(ctx context.Context, opts *benchOpts) (bool, error) { +func bench(ctx context.Context, logger *slog.Logger, _ io.Writer, opts *benchOpts) (bool, error) { if err := opts.validate(); err != nil { return false, err } @@ -215,16 +277,6 @@ func bench(ctx context.Context, opts *benchOpts) (bool, error) { } defer dbPool.Close() - var logger *slog.Logger - switch { - case opts.Debug: - logger = slog.New(tint.NewHandler(os.Stdout, &tint.Options{Level: slog.LevelDebug})) - case opts.Verbose: - logger = slog.New(tint.NewHandler(os.Stdout, nil)) - default: - logger = slog.New(tint.NewHandler(os.Stdout, &tint.Options{Level: slog.LevelWarn})) - } - benchmarker := riverbench.NewBenchmarker(riverpgxv5.New(dbPool), logger, opts.Duration, opts.NumTotalJobs) if err := benchmarker.Run(ctx); err != nil { @@ -234,13 +286,15 @@ func bench(ctx context.Context, opts *benchOpts) (bool, error) { return true, nil } -type migrateDownOpts struct { +type migrateOpts struct { DatabaseURL string + DryRun bool + ShowSQL bool MaxSteps int TargetVersion int } -func (o *migrateDownOpts) validate() error { +func (o *migrateOpts) validate() error { if o.DatabaseURL == "" { return errors.New("database URL cannot be empty") } @@ -248,20 +302,26 @@ func (o *migrateDownOpts) validate() error { return nil } -func migrateDown(ctx context.Context, opts *migrateDownOpts) (bool, error) { +func migrateDown(ctx context.Context, logger *slog.Logger, out io.Writer, opts *migrateOpts) (bool, error) { if err := opts.validate(); err != nil { return false, err } + // Default to applying only one migration maximum on the down direction. + if opts.MaxSteps == 0 && opts.TargetVersion == 0 { + opts.MaxSteps = 1 + } + dbPool, err := openDBPool(ctx, opts.DatabaseURL) if err != nil { return false, err } defer dbPool.Close() - migrator := rivermigrate.New(riverpgxv5.New(dbPool), nil) + migrator := rivermigrate.New(riverpgxv5.New(dbPool), &rivermigrate.Config{Logger: logger}) - _, err = migrator.Migrate(ctx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{ + res, err := migrator.Migrate(ctx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{ + DryRun: opts.DryRun, MaxSteps: opts.MaxSteps, TargetVersion: opts.TargetVersion, }) @@ -269,24 +329,64 @@ func migrateDown(ctx context.Context, opts *migrateDownOpts) (bool, error) { return false, err } + migratePrintResult(out, opts, res, rivermigrate.DirectionDown) + return true, nil } -type migrateUpOpts struct { - DatabaseURL string - MaxSteps int - TargetVersion int +func migratePrintResult(out io.Writer, opts *migrateOpts, res *rivermigrate.MigrateResult, direction rivermigrate.Direction) { + if len(res.Versions) < 1 { + fmt.Fprintf(out, "no migrations to apply\n") + return + } + + for _, migrateVersion := range res.Versions { + if opts.DryRun { + fmt.Fprintf(out, "migration %03d [%s] [DRY RUN]\n", migrateVersion.Version, direction) + } else { + fmt.Fprintf(out, "applied migration %03d [%s] [%s]\n", migrateVersion.Version, direction, migrateVersion.Duration) + } + + if opts.ShowSQL { + fmt.Fprintf(out, "%s\n", strings.Repeat("-", 80)) + fmt.Fprintf(out, "%s\n\n", strings.TrimSpace(migrateVersion.SQL)) + } + } + + // Only prints if more steps than available were requested. + if opts.MaxSteps > 0 && len(res.Versions) < opts.MaxSteps { + fmt.Fprintf(out, "no more migrations to apply\n") + } } -func (o *migrateUpOpts) validate() error { - if o.DatabaseURL == "" { - return errors.New("database URL cannot be empty") +type migrateGetOpts struct { + Down bool + Up bool + Version int +} + +func migrateGet(ctx context.Context, logger *slog.Logger, out io.Writer, opts *migrateGetOpts) (bool, error) { + var direction rivermigrate.Direction + switch { + case opts.Down: + direction = rivermigrate.DirectionDown + case opts.Up: + direction = rivermigrate.DirectionUp } - return nil + migrator := rivermigrate.New(riverpgxv5.New(nil), &rivermigrate.Config{Logger: logger}) + + migration, err := migrator.GetVersion(ctx, direction, opts.Version) + if err != nil { + return false, err + } + + fmt.Fprintf(out, "%s\n", strings.TrimSpace(migration.SQL)) + + return true, nil } -func migrateUp(ctx context.Context, opts *migrateUpOpts) (bool, error) { +func migrateUp(ctx context.Context, logger *slog.Logger, out io.Writer, opts *migrateOpts) (bool, error) { if err := opts.validate(); err != nil { return false, err } @@ -297,9 +397,10 @@ func migrateUp(ctx context.Context, opts *migrateUpOpts) (bool, error) { } defer dbPool.Close() - migrator := rivermigrate.New(riverpgxv5.New(dbPool), nil) + migrator := rivermigrate.New(riverpgxv5.New(dbPool), &rivermigrate.Config{Logger: logger}) - _, err = migrator.Migrate(ctx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{ + res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{ + DryRun: opts.DryRun, MaxSteps: opts.MaxSteps, TargetVersion: opts.TargetVersion, }) @@ -307,6 +408,8 @@ func migrateUp(ctx context.Context, opts *migrateUpOpts) (bool, error) { return false, err } + migratePrintResult(out, opts, res, rivermigrate.DirectionUp) + return true, nil } @@ -322,7 +425,7 @@ func (o *validateOpts) validate() error { return nil } -func validate(ctx context.Context, opts *validateOpts) (bool, error) { +func validate(ctx context.Context, logger *slog.Logger, _ io.Writer, opts *validateOpts) (bool, error) { if err := opts.validate(); err != nil { return false, err } @@ -333,7 +436,7 @@ func validate(ctx context.Context, opts *validateOpts) (bool, error) { } defer dbPool.Close() - migrator := rivermigrate.New(riverpgxv5.New(dbPool), nil) + migrator := rivermigrate.New(riverpgxv5.New(dbPool), &rivermigrate.Config{Logger: logger}) res, err := migrator.Validate(ctx) if err != nil { diff --git a/rivermigrate/river_migrate.go b/rivermigrate/river_migrate.go index 10128130..d046d58b 100644 --- a/rivermigrate/river_migrate.go +++ b/rivermigrate/river_migrate.go @@ -107,6 +107,8 @@ func New[TTx any](driver riverdriver.Driver[TTx], config *Config) *Migrator[TTx] // MigrateOpts are options for a migrate operation. type MigrateOpts struct { + DryRun bool + // MaxSteps is the maximum number of migrations to apply either up or down. // When migrating in the up direction, migrates an unlimited number of steps // by default. When migrating in the down direction, migrates only a single @@ -143,6 +145,12 @@ type MigrateResult struct { // MigrateVersion is the result for a single applied migration. type MigrateVersion struct { + // Duration is the amount of time it took to apply the migration. + Duration time.Duration + + // SQL is the SQL that was applied along with the migration. + SQL string + // Version is the version of the migration applied. Version int } @@ -156,6 +164,27 @@ const ( DirectionUp Direction = "up" ) +// GetVersion gets information about a specific migration version. An error is +// returned if a versions is requested that doesn't exist. +func (m *Migrator[TTx]) GetVersion(ctx context.Context, direction Direction, version int) (*MigrateVersion, error) { + versionBundle, ok := m.migrations[version] + if !ok { + availableVersions := maputil.Keys(m.migrations) + slices.Sort(availableVersions) + return nil, fmt.Errorf("migration %d not found (available versions: %v)", version, availableVersions) + } + + var sql string + switch direction { + case DirectionDown: + sql = versionBundle.Down + case DirectionUp: + sql = versionBundle.Up + } + + return &MigrateVersion{SQL: sql, Version: versionBundle.Version}, nil +} + // Migrate migrates the database in the given direction (up or down). The opts // parameter may be omitted for convenience. // @@ -171,12 +200,12 @@ const ( // // handle error // } func (m *Migrator[TTx]) Migrate(ctx context.Context, direction Direction, opts *MigrateOpts) (*MigrateResult, error) { - return dbutil.WithTxV(ctx, m.driver.GetExecutor(), func(ctx context.Context, tx riverdriver.ExecutorTx) (*MigrateResult, error) { + return dbutil.WithTxV(ctx, m.driver.GetExecutor(), func(ctx context.Context, exec riverdriver.ExecutorTx) (*MigrateResult, error) { switch direction { case DirectionDown: - return m.migrateDown(ctx, tx, direction, opts) + return m.migrateDown(ctx, exec, direction, opts) case DirectionUp: - return m.migrateUp(ctx, tx, direction, opts) + return m.migrateUp(ctx, exec, direction, opts) } panic("invalid direction: " + direction) @@ -278,8 +307,10 @@ func (m *Migrator[TTx]) migrateDown(ctx context.Context, exec riverdriver.Execut return res, nil } - if _, err := exec.MigrationDeleteByVersionMany(ctx, sliceutil.Map(res.Versions, migrateVersionToInt)); err != nil { - return nil, fmt.Errorf("error deleting migration rows for versions %+v: %w", res.Versions, err) + if !opts.DryRun { + if _, err := exec.MigrationDeleteByVersionMany(ctx, sliceutil.Map(res.Versions, migrateVersionToInt)); err != nil { + return nil, fmt.Errorf("error deleting migration rows for versions %+v: %w", res.Versions, err) + } } return res, nil @@ -305,8 +336,10 @@ func (m *Migrator[TTx]) migrateUp(ctx context.Context, exec riverdriver.Executor return nil, err } - if _, err := exec.MigrationInsertMany(ctx, sliceutil.Map(res.Versions, migrateVersionToInt)); err != nil { - return nil, fmt.Errorf("error inserting migration rows for versions %+v: %w", res.Versions, err) + if opts == nil || !opts.DryRun { + if _, err := exec.MigrationInsertMany(ctx, sliceutil.Map(res.Versions, migrateVersionToInt)); err != nil { + return nil, fmt.Errorf("error inserting migration rows for versions %+v: %w", res.Versions, err) + } } return res, nil @@ -391,28 +424,34 @@ func (m *Migrator[TTx]) applyMigrations(ctx context.Context, exec riverdriver.Ex } for _, versionBundle := range sortedTargetMigrations { - sql := versionBundle.Up - if direction == DirectionDown { + var sql string + switch direction { + case DirectionDown: sql = versionBundle.Down + case DirectionUp: + sql = versionBundle.Up } - m.Logger.InfoContext(ctx, fmt.Sprintf(m.Name+": Applying migration %03d [%s]", versionBundle.Version, strings.ToUpper(string(direction))), - slog.String("direction", string(direction)), - slog.Int("version", versionBundle.Version), - ) + var duration time.Duration - _, err := exec.Exec(ctx, sql) - if err != nil { - return nil, fmt.Errorf("error applying version %03d [%s]: %w", - versionBundle.Version, strings.ToUpper(string(direction)), err) + if !opts.DryRun { + start := time.Now() + _, err := exec.Exec(ctx, sql) + if err != nil { + return nil, fmt.Errorf("error applying version %03d [%s]: %w", + versionBundle.Version, strings.ToUpper(string(direction)), err) + } + duration = time.Since(start) } - res.Versions = append(res.Versions, MigrateVersion{Version: versionBundle.Version}) - } + m.Logger.InfoContext(ctx, m.Name+": Applied migration", + slog.String("direction", string(direction)), + slog.Bool("dry_run", opts.DryRun), + slog.Duration("duration", duration), + slog.Int("version", versionBundle.Version), + ) - // Only prints if more steps than available were requested. - if opts.MaxSteps > 0 && len(res.Versions) < opts.MaxSteps { - m.Logger.InfoContext(ctx, m.Name+": No more migrations to apply") + res.Versions = append(res.Versions, MigrateVersion{Duration: duration, SQL: sql, Version: versionBundle.Version}) } return res, nil diff --git a/rivermigrate/river_migrate_test.go b/rivermigrate/river_migrate_test.go index ef99434f..b6bf7548 100644 --- a/rivermigrate/river_migrate_test.go +++ b/rivermigrate/river_migrate_test.go @@ -262,6 +262,54 @@ func TestMigrator(t *testing.T) { } }) + t.Run("MigrateDownDryRun", func(t *testing.T) { + t.Parallel() + + migrator, bundle := setup(t) + + _, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{}) + require.NoError(t, err) + + res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionDown, &MigrateOpts{DryRun: true}) + require.NoError(t, err) + require.Equal(t, []int{riverMigrationsWithTestVersionsMaxVersion}, sliceutil.Map(res.Versions, migrateVersionToInt)) + + // Migrate down returned a result above for a migration that was + // removed, but because we're in a dry run, the database still shows + // this version. + migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetAll(ctx) + require.NoError(t, err) + require.Equal(t, seqOneTo(riverMigrationsWithTestVersionsMaxVersion), + sliceutil.Map(migrations, migrationToInt)) + }) + + t.Run("MigrateGetVersion", func(t *testing.T) { + t.Parallel() + + migrator, _ := setup(t) + + { + migrateVersion, err := migrator.GetVersion(ctx, DirectionDown, riverMigrationsWithTestVersionsMaxVersion) + require.NoError(t, err) + require.Equal(t, riverMigrationsWithTestVersionsMaxVersion, migrateVersion.Version) + require.Equal(t, migrator.migrations[riverMigrationsWithTestVersionsMaxVersion].Down, migrateVersion.SQL) + } + + { + migrateVersion, err := migrator.GetVersion(ctx, DirectionUp, riverMigrationsWithTestVersionsMaxVersion) + require.NoError(t, err) + require.Equal(t, riverMigrationsWithTestVersionsMaxVersion, migrateVersion.Version) + require.Equal(t, migrator.migrations[riverMigrationsWithTestVersionsMaxVersion].Up, migrateVersion.SQL) + } + + { + _, err := migrator.GetVersion(ctx, DirectionUp, 99_999) + + availableVersions := seqOneTo(riverMigrationsWithTestVersionsMaxVersion) + require.EqualError(t, err, fmt.Sprintf("migration %d not found (available versions: %v)", 99_999, availableVersions)) + } + }) + t.Run("MigrateNilOpts", func(t *testing.T) { t.Parallel() @@ -402,6 +450,26 @@ func TestMigrator(t *testing.T) { } }) + t.Run("MigrateUpDryRun", func(t *testing.T) { + t.Parallel() + + migrator, bundle := setup(t) + + res, err := migrator.MigrateTx(ctx, bundle.tx, DirectionUp, &MigrateOpts{DryRun: true}) + require.NoError(t, err) + require.Equal(t, DirectionUp, res.Direction) + require.Equal(t, []int{riverMigrationsWithTestVersionsMaxVersion - 1, riverMigrationsWithTestVersionsMaxVersion}, + sliceutil.Map(res.Versions, migrateVersionToInt)) + + // Migrate up returned a result above for migrations that were applied, + // but because we're in a dry run, the database still shows the test + // migration versions not applied. + migrations, err := bundle.driver.UnwrapExecutor(bundle.tx).MigrationGetAll(ctx) + require.NoError(t, err) + require.Equal(t, seqOneTo(riverMigrationsMaxVersion), + sliceutil.Map(migrations, migrationToInt)) + }) + t.Run("ValidateSuccess", func(t *testing.T) { t.Parallel()