Skip to content

Commit

Permalink
AdvisoryLockable: Abort record create if with_advisory_lock fails to …
Browse files Browse the repository at this point in the history
…acquire advisory lock (#1078)
  • Loading branch information
bensheldon authored Sep 19, 2023
1 parent 1be5220 commit 6c60a4a
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 15 deletions.
64 changes: 49 additions & 15 deletions app/models/concerns/good_job/advisory_lockable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,12 @@ module AdvisoryLockable
# @return [Boolean]
attr_accessor :create_with_advisory_lock

after_create -> { advisory_lock }, if: :create_with_advisory_lock
after_create lambda {
advisory_lock || begin
errors.add(self.class.advisory_lockable_column, "Failed to acquire advisory lock: #{lockable_key}")
raise ActiveRecord::RecordInvalid # do not reference the record because it can cause I18n missing translation error
end
}, if: :create_with_advisory_lock
end

class_methods do
Expand Down Expand Up @@ -222,6 +227,47 @@ def advisory_unlock_key(key, function: advisory_unlockable_function)
connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Unlock', binds).first['unlocked']
end

# Tests whether the provided key has an advisory lock on it.
# @param key [String, Symbol] Key to test lock against
# @return [Boolean]
def advisory_locked_key?(key)
query = <<~SQL.squish
SELECT 1 AS one
FROM pg_locks
WHERE pg_locks.locktype = 'advisory'
AND pg_locks.objsubid = 1
AND pg_locks.classid = ('x' || substr(md5($1::text), 1, 16))::bit(32)::int
AND pg_locks.objid = (('x' || substr(md5($2::text), 1, 16))::bit(64) << 32)::bit(32)::int
LIMIT 1
SQL
binds = [
ActiveRecord::Relation::QueryAttribute.new('key', key, ActiveRecord::Type::String.new),
ActiveRecord::Relation::QueryAttribute.new('key', key, ActiveRecord::Type::String.new),
]
connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Locked?', binds).any?
end

# Tests whether this record is locked by the current database session.
# @param key [String, Symbol] Key to test lock against
# @return [Boolean]
def owns_advisory_lock_key?(key)
query = <<~SQL.squish
SELECT 1 AS one
FROM pg_locks
WHERE pg_locks.locktype = 'advisory'
AND pg_locks.objsubid = 1
AND pg_locks.classid = ('x' || substr(md5($1::text), 1, 16))::bit(32)::int
AND pg_locks.objid = (('x' || substr(md5($2::text), 1, 16))::bit(64) << 32)::bit(32)::int
AND pg_locks.pid = pg_backend_pid()
LIMIT 1
SQL
binds = [
ActiveRecord::Relation::QueryAttribute.new('key', key, ActiveRecord::Type::String.new),
ActiveRecord::Relation::QueryAttribute.new('key', key, ActiveRecord::Type::String.new),
]
connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Owns Advisory Lock?', binds).any?
end

def _advisory_lockable_column
advisory_lockable_column || primary_key
end
Expand Down Expand Up @@ -318,20 +364,7 @@ def with_advisory_lock(key: lockable_key, function: advisory_lockable_function)
# @param key [String, Symbol] Key to test lock against
# @return [Boolean]
def advisory_locked?(key: lockable_key)
query = <<~SQL.squish
SELECT 1 AS one
FROM pg_locks
WHERE pg_locks.locktype = 'advisory'
AND pg_locks.objsubid = 1
AND pg_locks.classid = ('x' || substr(md5($1::text), 1, 16))::bit(32)::int
AND pg_locks.objid = (('x' || substr(md5($2::text), 1, 16))::bit(64) << 32)::bit(32)::int
LIMIT 1
SQL
binds = [
ActiveRecord::Relation::QueryAttribute.new('key', key, ActiveRecord::Type::String.new),
ActiveRecord::Relation::QueryAttribute.new('key', key, ActiveRecord::Type::String.new),
]
self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Locked?', binds).any?
self.class.advisory_locked_key?(key)
end

# Tests whether this record does not have an advisory lock on it.
Expand All @@ -345,6 +378,7 @@ def advisory_unlocked?(key: lockable_key)
# @param key [String, Symbol] Key to test lock against
# @return [Boolean]
def owns_advisory_lock?(key: lockable_key)
self.class.owns_advisory_lock_key?(key)
query = <<~SQL.squish
SELECT 1 AS one
FROM pg_locks
Expand Down
121 changes: 121 additions & 0 deletions spec/app/models/concerns/good_job/advisory_lockable_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
it 'locks a key' do
model_class.advisory_lock_key(execution.lockable_key)
expect(execution).to be_advisory_locked
expect(model_class.advisory_locked_key?(execution.lockable_key)).to be true
model_class.advisory_unlock_key(execution.lockable_key)
end

Expand Down Expand Up @@ -138,6 +139,42 @@
end
end

describe '.advisory_locked_key?' do
it 'tests whether the key is locked' do
key = SecureRandom.uuid
expect(model_class.advisory_locked_key?(key)).to eq false

model_class.advisory_lock_key(key)
expect(model_class.advisory_locked_key?(key)).to eq true

model_class.advisory_unlock_key(key)
expect(model_class.advisory_locked_key?(key)).to eq false
end
end

describe '.owns_advisory_lock_key?' do
it 'tests whether the key is locked' do
locked_event = Concurrent::Event.new
done_event = Concurrent::Event.new

promise = rails_promise do
model_class.advisory_lock_key(execution.lockable_key) do
expect(execution.owns_advisory_lock?).to be true

locked_event.set
done_event.wait(5)
end
end

locked_event.wait(5)
expect(execution.owns_advisory_lock?).to be false
ensure
locked_event.set
done_event.set
promise.value!
end
end

describe '.with_advisory_lock' do
it 'opens a block with a lock that locks and unlocks records' do
records = nil
Expand Down Expand Up @@ -169,6 +206,29 @@

expect(sql).to eq 'SELECT "good_jobs".* FROM "good_jobs"'
end

it 'aborts save if cannot be advisory locked' do
uuid = SecureRandom.uuid
locked_event = Concurrent::Event.new
done_event = Concurrent::Event.new

promise = rails_promise do
model_class.advisory_lock_key("good_jobs-#{uuid}") do
locked_event.set
done_event.wait(5)
end
end

locked_event.wait(5)
record = model_class.create(id: uuid, active_job_id: uuid, create_with_advisory_lock: true)
expect(record).not_to be_persisted
expect(record.errors[:active_job_id]).to include("Failed to acquire advisory lock: good_jobs-#{uuid}")

expect { model_class.create!(id: uuid, active_job_id: uuid, create_with_advisory_lock: true) }.to raise_error ActiveRecord::RecordInvalid
ensure
done_event.set
promise.value!
end
end

describe '.includes_advisory_locks' do
Expand Down Expand Up @@ -322,4 +382,65 @@

execution.advisory_unlock
end

describe 'Advisory Lock behavior' do
it 'connection-level locks lock immediately within transactions' do
locked_event = Concurrent::Event.new
commit_event = Concurrent::Event.new
committed_event = Concurrent::Event.new
done_event = Concurrent::Event.new

promise = rails_promise do
execution.class.transaction do
execution.advisory_lock
locked_event.set

commit_event.wait(10)
end
committed_event.set

done_event.wait(10)
execution.advisory_unlock
end

locked_event.wait(10)
expect(execution.advisory_locked?).to be true
commit_event.set

committed_event.wait(10)
expect(execution.advisory_locked?).to be true

done_event.set
promise.value!
end

it 'transaction-level locks only lock within transactions' do
locked_event = Concurrent::Event.new
commit_event = Concurrent::Event.new
committed_event = Concurrent::Event.new
done_event = Concurrent::Event.new

promise = rails_promise do
execution.class.transaction do
execution.advisory_lock(function: "pg_advisory_xact_lock")
locked_event.set

commit_event.wait(10)
end
committed_event.set

done_event.wait(10)
end

locked_event.wait(10)
expect(execution.advisory_locked?).to be true
commit_event.set

committed_event.wait(10)
expect(execution.advisory_locked?).to be false

done_event.set
promise.value!
end
end
end

0 comments on commit 6c60a4a

Please sign in to comment.