Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

used uber atomic bool instead standard in lock/unlock db #580

Merged
merged 14 commits into from
Jun 23, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions database/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cassandra
import (
"errors"
"fmt"
"go.uber.org/atomic"
"io"
"io/ioutil"
nurl "net/url"
Expand Down Expand Up @@ -45,7 +46,7 @@ type Config struct {

type Cassandra struct {
session *gocql.Session
isLocked bool
isLocked atomic.Bool

// Open and WithInstance need to guarantee that config is never nil
config *Config
Expand Down Expand Up @@ -182,15 +183,16 @@ func (c *Cassandra) Close() error {
}

func (c *Cassandra) Lock() error {
if c.isLocked {
if !c.isLocked.CAS(false, true) {
return database.ErrLocked
}
c.isLocked = true
return nil
}

func (c *Cassandra) Unlock() error {
c.isLocked = false
if !c.isLocked.CAS(true, false) {
return database.ErrNotLocked
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion database/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (ch *ClickHouse) Lock() error {
}
func (ch *ClickHouse) Unlock() error {
if !ch.isLocked.CAS(true, false) {
return database.ErrLocked
return database.ErrNotLocked
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this is my bug))

}

return nil
Expand Down
101 changes: 49 additions & 52 deletions database/cockroachdb/cockroachdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"go.uber.org/atomic"
"io"
"io/ioutil"
nurl "net/url"
Expand Down Expand Up @@ -46,7 +47,7 @@ type Config struct {

type CockroachDb struct {
db *sql.DB
isLocked bool
isLocked atomic.Bool

// Open and WithInstance need to guarantee that config is never nil
config *Config
Expand Down Expand Up @@ -152,71 +153,67 @@ func (c *CockroachDb) Close() error {
// Locking is done manually with a separate lock table. Implementing advisory locks in CRDB is being discussed
// See: https://github.com/cockroachdb/cockroach/issues/13546
func (c *CockroachDb) Lock() error {
err := crdb.ExecuteTx(context.Background(), c.db, nil, func(tx *sql.Tx) (err error) {
aid, err := database.GenerateAdvisoryLockId(c.config.DatabaseName)
if err != nil {
return err
}

query := "SELECT * FROM " + c.config.LockTable + " WHERE lock_id = $1"
rows, err := tx.Query(query, aid)
if err != nil {
return database.Error{OrigErr: err, Err: "failed to fetch migration lock", Query: []byte(query)}
}
defer func() {
if errClose := rows.Close(); errClose != nil {
err = multierror.Append(err, errClose)
return database.CasRestoreOnErr(&c.isLocked, false, true, database.ErrLocked, func() (err error) {
return crdb.ExecuteTx(context.Background(), c.db, nil, func(tx *sql.Tx) (err error) {
aid, err := database.GenerateAdvisoryLockId(c.config.DatabaseName)
if err != nil {
return err
}
}()

// If row exists at all, lock is present
locked := rows.Next()
if locked && !c.config.ForceLock {
return database.ErrLocked
}
query := "SELECT * FROM " + c.config.LockTable + " WHERE lock_id = $1"
rows, err := tx.Query(query, aid)
if err != nil {
return database.Error{OrigErr: err, Err: "failed to fetch migration lock", Query: []byte(query)}
}
defer func() {
if errClose := rows.Close(); errClose != nil {
err = multierror.Append(err, errClose)
}
}()

// If row exists at all, lock is present
locked := rows.Next()
if locked && !c.config.ForceLock {
return database.ErrLocked
}

query = "INSERT INTO " + c.config.LockTable + " (lock_id) VALUES ($1)"
if _, err := tx.Exec(query, aid); err != nil {
return database.Error{OrigErr: err, Err: "failed to set migration lock", Query: []byte(query)}
}
query = "INSERT INTO " + c.config.LockTable + " (lock_id) VALUES ($1)"
if _, err := tx.Exec(query, aid); err != nil {
return database.Error{OrigErr: err, Err: "failed to set migration lock", Query: []byte(query)}
}

return nil
return nil
})
})

if err != nil {
return err
} else {
c.isLocked = true
return nil
}
}

// Locking is done manually with a separate lock table. Implementing advisory locks in CRDB is being discussed
// See: https://github.com/cockroachdb/cockroach/issues/13546
func (c *CockroachDb) Unlock() error {
aid, err := database.GenerateAdvisoryLockId(c.config.DatabaseName)
if err != nil {
return err
}
return database.CasRestoreOnErr(&c.isLocked, true, false, database.ErrNotLocked, func() (err error) {
aid, err := database.GenerateAdvisoryLockId(c.config.DatabaseName)
if err != nil {
return err
}

// In the event of an implementation (non-migration) error, it is possible for the lock to not be released. Until
// a better locking mechanism is added, a manual purging of the lock table may be required in such circumstances
query := "DELETE FROM " + c.config.LockTable + " WHERE lock_id = $1"
if _, err := c.db.Exec(query, aid); err != nil {
if e, ok := err.(*pq.Error); ok {
// 42P01 is "UndefinedTableError" in CockroachDB
// https://github.com/cockroachdb/cockroach/blob/master/pkg/sql/pgwire/pgerror/codes.go
if e.Code == "42P01" {
// On drops, the lock table is fully removed; This is fine, and is a valid "unlocked" state for the schema
c.isLocked = false
return nil
// In the event of an implementation (non-migration) error, it is possible for the lock to not be released. Until
// a better locking mechanism is added, a manual purging of the lock table may be required in such circumstances
query := "DELETE FROM " + c.config.LockTable + " WHERE lock_id = $1"
if _, err := c.db.Exec(query, aid); err != nil {
if e, ok := err.(*pq.Error); ok {
// 42P01 is "UndefinedTableError" in CockroachDB
// https://github.com/cockroachdb/cockroach/blob/master/pkg/sql/pgwire/pgerror/codes.go
if e.Code == "42P01" {
// On drops, the lock table is fully removed; This is fine, and is a valid "unlocked" state for the schema
return nil
}
}

return database.Error{OrigErr: err, Err: "failed to release migration lock", Query: []byte(query)}
}
return database.Error{OrigErr: err, Err: "failed to release migration lock", Query: []byte(query)}
}

c.isLocked = false
return nil
return nil
})
}

func (c *CockroachDb) Run(migration io.Reader) error {
Expand Down
10 changes: 6 additions & 4 deletions database/firebird/firebird.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/golang-migrate/migrate/v4/database"
"github.com/hashicorp/go-multierror"
_ "github.com/nakagami/firebirdsql"
"go.uber.org/atomic"
"io"
"io/ioutil"
nurl "net/url"
Expand All @@ -36,7 +37,7 @@ type Firebird struct {
// Locking and unlocking need to use the same connection
conn *sql.Conn
db *sql.DB
isLocked bool
isLocked atomic.Bool

// Open and WithInstance need to guarantee that config is never nil
config *Config
Expand Down Expand Up @@ -106,15 +107,16 @@ func (f *Firebird) Close() error {
}

func (f *Firebird) Lock() error {
if f.isLocked {
if !f.isLocked.CAS(false, true) {
return database.ErrLocked
}
f.isLocked = true
return nil
}

func (f *Firebird) Unlock() error {
f.isLocked = false
if !f.isLocked.CAS(true, false) {
return database.ErrNotLocked
}
return nil
}

Expand Down
98 changes: 52 additions & 46 deletions database/mongodb/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/mongo/driver/connstring"
"go.uber.org/atomic"
"io"
"io/ioutil"
"net/url"
Expand Down Expand Up @@ -40,9 +41,10 @@ var (
)

type Mongo struct {
client *mongo.Client
db *mongo.Database
config *Config
client *mongo.Client
db *mongo.Database
config *Config
isLocked atomic.Bool
}

type Locking struct {
Expand Down Expand Up @@ -327,55 +329,59 @@ func (m *Mongo) ensureVersionTable() (err error) {
// Utilizes advisory locking on the config.LockingCollection collection
// This uses a unique index on the `locking_key` field.
func (m *Mongo) Lock() error {
if !m.config.Locking.Enabled {
return nil
}
pid := os.Getpid()
hostname, err := os.Hostname()
if err != nil {
hostname = fmt.Sprintf("Could not determine hostname. Error: %s", err.Error())
}

newLockObj := lockObj{
Key: lockKeyUniqueValue,
Pid: pid,
Hostname: hostname,
CreatedAt: time.Now(),
}
operation := func() error {
timeout, cancelFunc := context.WithTimeout(context.Background(), contextWaitTimeout)
_, err := m.db.Collection(m.config.Locking.CollectionName).InsertOne(timeout, newLockObj)
defer cancelFunc()
return err
}
exponentialBackOff := backoff.NewExponentialBackOff()
duration := time.Duration(m.config.Locking.Timeout) * time.Second
exponentialBackOff.MaxElapsedTime = duration
exponentialBackOff.MaxInterval = time.Duration(m.config.Locking.Interval) * time.Second
return database.CasRestoreOnErr(&m.isLocked, false, true, database.ErrLocked, func() error {
if !m.config.Locking.Enabled {
return nil
}
pid := os.Getpid()
hostname, err := os.Hostname()
if err != nil {
hostname = fmt.Sprintf("Could not determine hostname. Error: %s", err.Error())
}

err = backoff.Retry(operation, exponentialBackOff)
if err != nil {
return database.ErrLocked
}
newLockObj := lockObj{
Key: lockKeyUniqueValue,
Pid: pid,
Hostname: hostname,
CreatedAt: time.Now(),
}
operation := func() error {
timeout, cancelFunc := context.WithTimeout(context.Background(), contextWaitTimeout)
_, err := m.db.Collection(m.config.Locking.CollectionName).InsertOne(timeout, newLockObj)
defer cancelFunc()
return err
}
exponentialBackOff := backoff.NewExponentialBackOff()
duration := time.Duration(m.config.Locking.Timeout) * time.Second
exponentialBackOff.MaxElapsedTime = duration
exponentialBackOff.MaxInterval = time.Duration(m.config.Locking.Interval) * time.Second

return nil
err = backoff.Retry(operation, exponentialBackOff)
if err != nil {
return database.ErrLocked
}

return nil
})
}

func (m *Mongo) Unlock() error {
if !m.config.Locking.Enabled {
return nil
}
return database.CasRestoreOnErr(&m.isLocked, true, false, database.ErrNotLocked, func() error {
if !m.config.Locking.Enabled {
return nil
}

filter := findFilter{
Key: lockKeyUniqueValue,
}
filter := findFilter{
Key: lockKeyUniqueValue,
}

ctx, cancel := context.WithTimeout(context.Background(), contextWaitTimeout)
_, err := m.db.Collection(m.config.Locking.CollectionName).DeleteMany(ctx, filter)
defer cancel()
ctx, cancel := context.WithTimeout(context.Background(), contextWaitTimeout)
_, err := m.db.Collection(m.config.Locking.CollectionName).DeleteMany(ctx, filter)
defer cancel()

if err != nil {
return err
}
return nil
if err != nil {
return err
}
return nil
})
}
Loading