diff --git a/app/models/concerns/good_job/advisory_lockable.rb b/app/models/concerns/good_job/advisory_lockable.rb index 23c57983a..4acf46c78 100644 --- a/app/models/concerns/good_job/advisory_lockable.rb +++ b/app/models/concerns/good_job/advisory_lockable.rb @@ -133,7 +133,12 @@ module AdvisoryLockable # @return [Boolean] attr_accessor :create_with_advisory_lock - after_create -> { advisory_lock }, if: :create_with_advisory_lock + after_create lambda { + advisory_lock || begin + errors.add(self.class.advisory_lockable_column, "Failed to acquire advisory lock: #{lockable_key}") + raise ActiveRecord::RecordInvalid # do not reference the record because it can cause I18n missing translation error + end + }, if: :create_with_advisory_lock end class_methods do @@ -222,6 +227,47 @@ def advisory_unlock_key(key, function: advisory_unlockable_function) connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Unlock', binds).first['unlocked'] end + # Tests whether the provided key has an advisory lock on it. + # @param key [String, Symbol] Key to test lock against + # @return [Boolean] + def advisory_locked_key?(key) + query = <<~SQL.squish + SELECT 1 AS one + FROM pg_locks + WHERE pg_locks.locktype = 'advisory' + AND pg_locks.objsubid = 1 + AND pg_locks.classid = ('x' || substr(md5($1::text), 1, 16))::bit(32)::int + AND pg_locks.objid = (('x' || substr(md5($2::text), 1, 16))::bit(64) << 32)::bit(32)::int + LIMIT 1 + SQL + binds = [ + ActiveRecord::Relation::QueryAttribute.new('key', key, ActiveRecord::Type::String.new), + ActiveRecord::Relation::QueryAttribute.new('key', key, ActiveRecord::Type::String.new), + ] + connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Locked?', binds).any? + end + + # Tests whether this record is locked by the current database session. + # @param key [String, Symbol] Key to test lock against + # @return [Boolean] + def owns_advisory_lock_key?(key) + query = <<~SQL.squish + SELECT 1 AS one + FROM pg_locks + WHERE pg_locks.locktype = 'advisory' + AND pg_locks.objsubid = 1 + AND pg_locks.classid = ('x' || substr(md5($1::text), 1, 16))::bit(32)::int + AND pg_locks.objid = (('x' || substr(md5($2::text), 1, 16))::bit(64) << 32)::bit(32)::int + AND pg_locks.pid = pg_backend_pid() + LIMIT 1 + SQL + binds = [ + ActiveRecord::Relation::QueryAttribute.new('key', key, ActiveRecord::Type::String.new), + ActiveRecord::Relation::QueryAttribute.new('key', key, ActiveRecord::Type::String.new), + ] + connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Owns Advisory Lock?', binds).any? + end + def _advisory_lockable_column advisory_lockable_column || primary_key end @@ -318,20 +364,7 @@ def with_advisory_lock(key: lockable_key, function: advisory_lockable_function) # @param key [String, Symbol] Key to test lock against # @return [Boolean] def advisory_locked?(key: lockable_key) - query = <<~SQL.squish - SELECT 1 AS one - FROM pg_locks - WHERE pg_locks.locktype = 'advisory' - AND pg_locks.objsubid = 1 - AND pg_locks.classid = ('x' || substr(md5($1::text), 1, 16))::bit(32)::int - AND pg_locks.objid = (('x' || substr(md5($2::text), 1, 16))::bit(64) << 32)::bit(32)::int - LIMIT 1 - SQL - binds = [ - ActiveRecord::Relation::QueryAttribute.new('key', key, ActiveRecord::Type::String.new), - ActiveRecord::Relation::QueryAttribute.new('key', key, ActiveRecord::Type::String.new), - ] - self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Locked?', binds).any? + self.class.advisory_locked_key?(key) end # Tests whether this record does not have an advisory lock on it. @@ -345,6 +378,7 @@ def advisory_unlocked?(key: lockable_key) # @param key [String, Symbol] Key to test lock against # @return [Boolean] def owns_advisory_lock?(key: lockable_key) + self.class.owns_advisory_lock_key?(key) query = <<~SQL.squish SELECT 1 AS one FROM pg_locks diff --git a/spec/app/models/concerns/good_job/advisory_lockable_spec.rb b/spec/app/models/concerns/good_job/advisory_lockable_spec.rb index d86427ec9..d67bbeb7a 100644 --- a/spec/app/models/concerns/good_job/advisory_lockable_spec.rb +++ b/spec/app/models/concerns/good_job/advisory_lockable_spec.rb @@ -111,6 +111,7 @@ it 'locks a key' do model_class.advisory_lock_key(execution.lockable_key) expect(execution).to be_advisory_locked + expect(model_class.advisory_locked_key?(execution.lockable_key)).to be true model_class.advisory_unlock_key(execution.lockable_key) end @@ -138,6 +139,42 @@ end end + describe '.advisory_locked_key?' do + it 'tests whether the key is locked' do + key = SecureRandom.uuid + expect(model_class.advisory_locked_key?(key)).to eq false + + model_class.advisory_lock_key(key) + expect(model_class.advisory_locked_key?(key)).to eq true + + model_class.advisory_unlock_key(key) + expect(model_class.advisory_locked_key?(key)).to eq false + end + end + + describe '.owns_advisory_lock_key?' do + it 'tests whether the key is locked' do + locked_event = Concurrent::Event.new + done_event = Concurrent::Event.new + + promise = rails_promise do + model_class.advisory_lock_key(execution.lockable_key) do + expect(execution.owns_advisory_lock?).to be true + + locked_event.set + done_event.wait(5) + end + end + + locked_event.wait(5) + expect(execution.owns_advisory_lock?).to be false + ensure + locked_event.set + done_event.set + promise.value! + end + end + describe '.with_advisory_lock' do it 'opens a block with a lock that locks and unlocks records' do records = nil @@ -169,6 +206,29 @@ expect(sql).to eq 'SELECT "good_jobs".* FROM "good_jobs"' end + + it 'aborts save if cannot be advisory locked' do + uuid = SecureRandom.uuid + locked_event = Concurrent::Event.new + done_event = Concurrent::Event.new + + promise = rails_promise do + model_class.advisory_lock_key("good_jobs-#{uuid}") do + locked_event.set + done_event.wait(5) + end + end + + locked_event.wait(5) + record = model_class.create(id: uuid, active_job_id: uuid, create_with_advisory_lock: true) + expect(record).not_to be_persisted + expect(record.errors[:active_job_id]).to include("Failed to acquire advisory lock: good_jobs-#{uuid}") + + expect { model_class.create!(id: uuid, active_job_id: uuid, create_with_advisory_lock: true) }.to raise_error ActiveRecord::RecordInvalid + ensure + done_event.set + promise.value! + end end describe '.includes_advisory_locks' do @@ -322,4 +382,65 @@ execution.advisory_unlock end + + describe 'Advisory Lock behavior' do + it 'connection-level locks lock immediately within transactions' do + locked_event = Concurrent::Event.new + commit_event = Concurrent::Event.new + committed_event = Concurrent::Event.new + done_event = Concurrent::Event.new + + promise = rails_promise do + execution.class.transaction do + execution.advisory_lock + locked_event.set + + commit_event.wait(10) + end + committed_event.set + + done_event.wait(10) + execution.advisory_unlock + end + + locked_event.wait(10) + expect(execution.advisory_locked?).to be true + commit_event.set + + committed_event.wait(10) + expect(execution.advisory_locked?).to be true + + done_event.set + promise.value! + end + + it 'transaction-level locks only lock within transactions' do + locked_event = Concurrent::Event.new + commit_event = Concurrent::Event.new + committed_event = Concurrent::Event.new + done_event = Concurrent::Event.new + + promise = rails_promise do + execution.class.transaction do + execution.advisory_lock(function: "pg_advisory_xact_lock") + locked_event.set + + commit_event.wait(10) + end + committed_event.set + + done_event.wait(10) + end + + locked_event.wait(10) + expect(execution.advisory_locked?).to be true + commit_event.set + + committed_event.wait(10) + expect(execution.advisory_locked?).to be false + + done_event.set + promise.value! + end + end end