From 129922aae306ce181df73bfd7061725cba4ae23b Mon Sep 17 00:00:00 2001 From: Andreas Emilsson Date: Wed, 20 Sep 2023 09:59:55 +0200 Subject: [PATCH] Added support for pgx locking table In order to support running migrations through PgBouncer which does not support advisory locks. --- database/pgx/pgx.go | 158 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 139 insertions(+), 19 deletions(-) diff --git a/database/pgx/pgx.go b/database/pgx/pgx.go index deaca94ea..9e7d36d9e 100644 --- a/database/pgx/pgx.go +++ b/database/pgx/pgx.go @@ -25,6 +25,11 @@ import ( _ "github.com/jackc/pgx/v4/stdlib" ) +const ( + LockStrategyAdvisory = "advisory" + LockStrategyTable = "table" +) + func init() { db := Postgres{} database.Register("pgx", &db) @@ -36,6 +41,8 @@ var ( DefaultMigrationsTable = "schema_migrations" DefaultMultiStatementMaxSize = 10 * 1 << 20 // 10 MB + DefaultLockTable = "schema_lock" + DefaultLockStrategy = "advisory" ) var ( @@ -49,6 +56,8 @@ type Config struct { MigrationsTable string DatabaseName string SchemaName string + LockTable string + LockStrategy string migrationsSchemaName string migrationsTableName string StatementTimeout time.Duration @@ -108,6 +117,14 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) { config.MigrationsTable = DefaultMigrationsTable } + if len(config.LockTable) == 0 { + config.LockTable = DefaultLockTable + } + + if len(config.LockStrategy) == 0 { + config.LockStrategy = DefaultLockStrategy + } + config.migrationsSchemaName = config.SchemaName config.migrationsTableName = config.MigrationsTable if config.MigrationsTableQuoted { @@ -133,6 +150,10 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) { config: config, } + if err := px.ensureLockTable(); err != nil { + return nil, err + } + if err := px.ensureVersionTable(); err != nil { return nil, err } @@ -196,6 +217,8 @@ func (p *Postgres) Open(url string) (database.Driver, error) { } } + lockStrategy := purl.Query().Get("x-lock-strategy") + px, err := WithInstance(db, &Config{ DatabaseName: purl.Path, MigrationsTable: migrationsTable, @@ -203,6 +226,7 @@ func (p *Postgres) Open(url string) (database.Driver, error) { StatementTimeout: time.Duration(statementTimeout) * time.Millisecond, MultiStatementEnabled: multiStatementEnabled, MultiStatementMaxSize: multiStatementMaxSize, + LockStrategy: lockStrategy, }) if err != nil { @@ -221,36 +245,110 @@ func (p *Postgres) Close() error { return nil } -// https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS func (p *Postgres) Lock() error { return database.CasRestoreOnErr(&p.isLocked, false, true, database.ErrLocked, func() error { - aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) - if err != nil { - return err - } - - // This will wait indefinitely until the lock can be acquired. - query := `SELECT pg_advisory_lock($1)` - if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { - return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)} + switch p.config.LockStrategy { + case LockStrategyAdvisory: + return p.applyAdvisoryLock() + case LockStrategyTable: + return p.applyTableLock() + default: + return fmt.Errorf("unknown lock strategy \"%s\"", p.config.LockStrategy) } - return nil }) } func (p *Postgres) Unlock() error { return database.CasRestoreOnErr(&p.isLocked, true, false, database.ErrNotLocked, func() error { - aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) - if err != nil { - return err + switch p.config.LockStrategy { + case LockStrategyAdvisory: + return p.releaseAdvisoryLock() + case LockStrategyTable: + return p.releaseTableLock() + default: + return fmt.Errorf("unknown lock strategy \"%s\"", p.config.LockStrategy) } + }) +} - query := `SELECT pg_advisory_unlock($1)` - if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { - return &database.Error{OrigErr: err, Query: []byte(query)} +// https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS +func (p *Postgres) applyAdvisoryLock() error { + aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) + if err != nil { + return err + } + + // This will wait indefinitely until the lock can be acquired. + query := `SELECT pg_advisory_lock($1)` + if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { + return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)} + } + return nil +} + +func (p *Postgres) applyTableLock() error { + tx, err := p.conn.BeginTx(context.Background(), &sql.TxOptions{}) + if err != nil { + return &database.Error{OrigErr: err, Err: "transaction start failed"} + } + + aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName) + if err != nil { + return err + } + + query := "SELECT * FROM " + p.config.LockTable + " WHERE lock_id = $1" + rows, err := tx.Query(query, aid) + if err != nil { + return database.Error{OrigErr: err, Err: "failed to fetch migration lock", Query: []byte(query)} + } + + defer func() { + if errClose := rows.Close(); errClose != nil { + err = multierror.Append(err, errClose) } - return nil - }) + }() + + // If row exists at all, lock is present + locked := rows.Next() + if locked { + return database.ErrLocked + } + + query = "INSERT INTO " + p.config.LockTable + " (lock_id) VALUES ($1)" + if _, err := tx.Exec(query, aid); err != nil { + return database.Error{OrigErr: err, Err: "failed to set migration lock", Query: []byte(query)} + } + + return tx.Commit() +} + +func (p *Postgres) releaseAdvisoryLock() error { + aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) + if err != nil { + return err + } + + query := `SELECT pg_advisory_unlock($1)` + if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { + return &database.Error{OrigErr: err, Query: []byte(query)} + } + + return nil +} + +func (p *Postgres) releaseTableLock() error { + aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName) + if err != nil { + return err + } + + query := "DELETE FROM " + p.config.LockTable + " WHERE lock_id = $1" + if _, err := p.db.Exec(query, aid); err != nil { + return database.Error{OrigErr: err, Err: "failed to release migration lock", Query: []byte(query)} + } + + return nil } func (p *Postgres) Run(migration io.Reader) error { @@ -478,6 +576,28 @@ func (p *Postgres) ensureVersionTable() (err error) { return nil } +func (p *Postgres) ensureLockTable() error { + if p.config.LockStrategy != LockStrategyTable { + return nil + } + + var count int + query := `SELECT COUNT(1) FROM information_schema.tables WHERE table_name = $1 AND table_schema = (SELECT current_schema()) LIMIT 1` + if err := p.db.QueryRow(query, p.config.LockTable).Scan(&count); err != nil { + return &database.Error{OrigErr: err, Query: []byte(query)} + } + if count == 1 { + return nil + } + + query = `CREATE TABLE "` + p.config.LockTable + `" (lock_id BIGINT NOT NULL PRIMARY KEY)` + if _, err := p.db.Exec(query); err != nil { + return &database.Error{OrigErr: err, Query: []byte(query)} + } + + return nil +} + // Copied from lib/pq implementation: https://github.com/lib/pq/blob/v1.9.0/conn.go#L1611 func quoteIdentifier(name string) string { end := strings.IndexRune(name, 0)