Skip to content

Commit

Permalink
used uber atomic bool instead standard in lock/unlock db (#580)
Browse files Browse the repository at this point in the history
* used uber atomic bool instead standard in lock/unlock db

* fixing local lock optimization implementation

* fixing mysql local lock optimization implementation

* fixing cockroachdb local lock optimization implementation

* CAS wrapper to automatically restore

* added test for cas with restore

* added atomic lock to sqlite & mongo

* added atomic lock to sqlite & mongo

* fix/refactor test cas restore function

* disable lock for mongo

* mongo local lock before acquiring the db lock

Co-authored-by: a.prinkov <[email protected]>
  • Loading branch information
prinkov and a.prinkov authored Jun 23, 2021
1 parent 3b3c1b6 commit 3dfb0ff
Show file tree
Hide file tree
Showing 19 changed files with 347 additions and 299 deletions.
10 changes: 6 additions & 4 deletions database/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cassandra
import (
"errors"
"fmt"
"go.uber.org/atomic"
"io"
"io/ioutil"
nurl "net/url"
Expand Down Expand Up @@ -45,7 +46,7 @@ type Config struct {

type Cassandra struct {
session *gocql.Session
isLocked bool
isLocked atomic.Bool

// Open and WithInstance need to guarantee that config is never nil
config *Config
Expand Down Expand Up @@ -182,15 +183,16 @@ func (c *Cassandra) Close() error {
}

func (c *Cassandra) Lock() error {
if c.isLocked {
if !c.isLocked.CAS(false, true) {
return database.ErrLocked
}
c.isLocked = true
return nil
}

func (c *Cassandra) Unlock() error {
c.isLocked = false
if !c.isLocked.CAS(true, false) {
return database.ErrNotLocked
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion database/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (ch *ClickHouse) Lock() error {
}
func (ch *ClickHouse) Unlock() error {
if !ch.isLocked.CAS(true, false) {
return database.ErrLocked
return database.ErrNotLocked
}

return nil
Expand Down
101 changes: 49 additions & 52 deletions database/cockroachdb/cockroachdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"go.uber.org/atomic"
"io"
"io/ioutil"
nurl "net/url"
Expand Down Expand Up @@ -46,7 +47,7 @@ type Config struct {

type CockroachDb struct {
db *sql.DB
isLocked bool
isLocked atomic.Bool

// Open and WithInstance need to guarantee that config is never nil
config *Config
Expand Down Expand Up @@ -152,71 +153,67 @@ func (c *CockroachDb) Close() error {
// Locking is done manually with a separate lock table. Implementing advisory locks in CRDB is being discussed
// See: https://github.com/cockroachdb/cockroach/issues/13546
func (c *CockroachDb) Lock() error {
err := crdb.ExecuteTx(context.Background(), c.db, nil, func(tx *sql.Tx) (err error) {
aid, err := database.GenerateAdvisoryLockId(c.config.DatabaseName)
if err != nil {
return err
}

query := "SELECT * FROM " + c.config.LockTable + " WHERE lock_id = $1"
rows, err := tx.Query(query, aid)
if err != nil {
return database.Error{OrigErr: err, Err: "failed to fetch migration lock", Query: []byte(query)}
}
defer func() {
if errClose := rows.Close(); errClose != nil {
err = multierror.Append(err, errClose)
return database.CasRestoreOnErr(&c.isLocked, false, true, database.ErrLocked, func() (err error) {
return crdb.ExecuteTx(context.Background(), c.db, nil, func(tx *sql.Tx) (err error) {
aid, err := database.GenerateAdvisoryLockId(c.config.DatabaseName)
if err != nil {
return err
}
}()

// If row exists at all, lock is present
locked := rows.Next()
if locked && !c.config.ForceLock {
return database.ErrLocked
}
query := "SELECT * FROM " + c.config.LockTable + " WHERE lock_id = $1"
rows, err := tx.Query(query, aid)
if err != nil {
return database.Error{OrigErr: err, Err: "failed to fetch migration lock", Query: []byte(query)}
}
defer func() {
if errClose := rows.Close(); errClose != nil {
err = multierror.Append(err, errClose)
}
}()

// If row exists at all, lock is present
locked := rows.Next()
if locked && !c.config.ForceLock {
return database.ErrLocked
}

query = "INSERT INTO " + c.config.LockTable + " (lock_id) VALUES ($1)"
if _, err := tx.Exec(query, aid); err != nil {
return database.Error{OrigErr: err, Err: "failed to set migration lock", Query: []byte(query)}
}
query = "INSERT INTO " + c.config.LockTable + " (lock_id) VALUES ($1)"
if _, err := tx.Exec(query, aid); err != nil {
return database.Error{OrigErr: err, Err: "failed to set migration lock", Query: []byte(query)}
}

return nil
return nil
})
})

if err != nil {
return err
} else {
c.isLocked = true
return nil
}
}

// Locking is done manually with a separate lock table. Implementing advisory locks in CRDB is being discussed
// See: https://github.com/cockroachdb/cockroach/issues/13546
func (c *CockroachDb) Unlock() error {
aid, err := database.GenerateAdvisoryLockId(c.config.DatabaseName)
if err != nil {
return err
}
return database.CasRestoreOnErr(&c.isLocked, true, false, database.ErrNotLocked, func() (err error) {
aid, err := database.GenerateAdvisoryLockId(c.config.DatabaseName)
if err != nil {
return err
}

// In the event of an implementation (non-migration) error, it is possible for the lock to not be released. Until
// a better locking mechanism is added, a manual purging of the lock table may be required in such circumstances
query := "DELETE FROM " + c.config.LockTable + " WHERE lock_id = $1"
if _, err := c.db.Exec(query, aid); err != nil {
if e, ok := err.(*pq.Error); ok {
// 42P01 is "UndefinedTableError" in CockroachDB
// https://github.com/cockroachdb/cockroach/blob/master/pkg/sql/pgwire/pgerror/codes.go
if e.Code == "42P01" {
// On drops, the lock table is fully removed; This is fine, and is a valid "unlocked" state for the schema
c.isLocked = false
return nil
// In the event of an implementation (non-migration) error, it is possible for the lock to not be released. Until
// a better locking mechanism is added, a manual purging of the lock table may be required in such circumstances
query := "DELETE FROM " + c.config.LockTable + " WHERE lock_id = $1"
if _, err := c.db.Exec(query, aid); err != nil {
if e, ok := err.(*pq.Error); ok {
// 42P01 is "UndefinedTableError" in CockroachDB
// https://github.com/cockroachdb/cockroach/blob/master/pkg/sql/pgwire/pgerror/codes.go
if e.Code == "42P01" {
// On drops, the lock table is fully removed; This is fine, and is a valid "unlocked" state for the schema
return nil
}
}

return database.Error{OrigErr: err, Err: "failed to release migration lock", Query: []byte(query)}
}
return database.Error{OrigErr: err, Err: "failed to release migration lock", Query: []byte(query)}
}

c.isLocked = false
return nil
return nil
})
}

func (c *CockroachDb) Run(migration io.Reader) error {
Expand Down
10 changes: 6 additions & 4 deletions database/firebird/firebird.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/golang-migrate/migrate/v4/database"
"github.com/hashicorp/go-multierror"
_ "github.com/nakagami/firebirdsql"
"go.uber.org/atomic"
"io"
"io/ioutil"
nurl "net/url"
Expand All @@ -36,7 +37,7 @@ type Firebird struct {
// Locking and unlocking need to use the same connection
conn *sql.Conn
db *sql.DB
isLocked bool
isLocked atomic.Bool

// Open and WithInstance need to guarantee that config is never nil
config *Config
Expand Down Expand Up @@ -106,15 +107,16 @@ func (f *Firebird) Close() error {
}

func (f *Firebird) Lock() error {
if f.isLocked {
if !f.isLocked.CAS(false, true) {
return database.ErrLocked
}
f.isLocked = true
return nil
}

func (f *Firebird) Unlock() error {
f.isLocked = false
if !f.isLocked.CAS(true, false) {
return database.ErrNotLocked
}
return nil
}

Expand Down
97 changes: 52 additions & 45 deletions database/mongodb/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/mongo/driver/connstring"
"go.uber.org/atomic"
"io"
"io/ioutil"
"net/url"
Expand Down Expand Up @@ -40,9 +41,10 @@ var (
)

type Mongo struct {
client *mongo.Client
db *mongo.Database
config *Config
client *mongo.Client
db *mongo.Database
config *Config
isLocked atomic.Bool
}

type Locking struct {
Expand Down Expand Up @@ -327,55 +329,60 @@ func (m *Mongo) ensureVersionTable() (err error) {
// Utilizes advisory locking on the config.LockingCollection collection
// This uses a unique index on the `locking_key` field.
func (m *Mongo) Lock() error {
if !m.config.Locking.Enabled {
return nil
}
pid := os.Getpid()
hostname, err := os.Hostname()
if err != nil {
hostname = fmt.Sprintf("Could not determine hostname. Error: %s", err.Error())
}
return database.CasRestoreOnErr(&m.isLocked, false, true, database.ErrLocked, func() error {
if !m.config.Locking.Enabled {
return nil
}

newLockObj := lockObj{
Key: lockKeyUniqueValue,
Pid: pid,
Hostname: hostname,
CreatedAt: time.Now(),
}
operation := func() error {
timeout, cancelFunc := context.WithTimeout(context.Background(), contextWaitTimeout)
_, err := m.db.Collection(m.config.Locking.CollectionName).InsertOne(timeout, newLockObj)
defer cancelFunc()
return err
}
exponentialBackOff := backoff.NewExponentialBackOff()
duration := time.Duration(m.config.Locking.Timeout) * time.Second
exponentialBackOff.MaxElapsedTime = duration
exponentialBackOff.MaxInterval = time.Duration(m.config.Locking.Interval) * time.Second
pid := os.Getpid()
hostname, err := os.Hostname()
if err != nil {
hostname = fmt.Sprintf("Could not determine hostname. Error: %s", err.Error())
}

err = backoff.Retry(operation, exponentialBackOff)
if err != nil {
return database.ErrLocked
}
newLockObj := lockObj{
Key: lockKeyUniqueValue,
Pid: pid,
Hostname: hostname,
CreatedAt: time.Now(),
}
operation := func() error {
timeout, cancelFunc := context.WithTimeout(context.Background(), contextWaitTimeout)
_, err := m.db.Collection(m.config.Locking.CollectionName).InsertOne(timeout, newLockObj)
defer cancelFunc()
return err
}
exponentialBackOff := backoff.NewExponentialBackOff()
duration := time.Duration(m.config.Locking.Timeout) * time.Second
exponentialBackOff.MaxElapsedTime = duration
exponentialBackOff.MaxInterval = time.Duration(m.config.Locking.Interval) * time.Second

return nil
err = backoff.Retry(operation, exponentialBackOff)
if err != nil {
return database.ErrLocked
}

return nil
})
}

func (m *Mongo) Unlock() error {
if !m.config.Locking.Enabled {
return nil
}
return database.CasRestoreOnErr(&m.isLocked, true, false, database.ErrNotLocked, func() error {
if !m.config.Locking.Enabled {
return nil
}

filter := findFilter{
Key: lockKeyUniqueValue,
}
filter := findFilter{
Key: lockKeyUniqueValue,
}

ctx, cancel := context.WithTimeout(context.Background(), contextWaitTimeout)
_, err := m.db.Collection(m.config.Locking.CollectionName).DeleteMany(ctx, filter)
defer cancel()
ctx, cancel := context.WithTimeout(context.Background(), contextWaitTimeout)
_, err := m.db.Collection(m.config.Locking.CollectionName).DeleteMany(ctx, filter)
defer cancel()

if err != nil {
return err
}
return nil
if err != nil {
return err
}
return nil
})
}
13 changes: 1 addition & 12 deletions database/mongodb/mongodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,18 +221,7 @@ func TestLockWorks(t *testing.T) {
t.Fatal(err)
}

// disable locking, validate wer can lock twice
mc.config.Locking.Enabled = false
err = mc.Lock()
if err != nil {
t.Fatal(err)
}
err = mc.Lock()
if err != nil {
t.Fatal(err)
}

// re-enable locking,
// enable locking,
//try to hit a lock conflict
mc.config.Locking.Enabled = true
mc.config.Locking.Timeout = 1
Expand Down
Loading

0 comments on commit 3dfb0ff

Please sign in to comment.