Skip to content

Commit

Permalink
Allow Lockable to be passed custom column, key, and Postgres advisory…
Browse files Browse the repository at this point in the history
… lock/unlock function (#273)
  • Loading branch information
bensheldon authored Jun 24, 2021
1 parent f7df44a commit 129691c
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 40 deletions.
124 changes: 85 additions & 39 deletions lib/good_job/lockable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,25 @@ module Lockable
RecordAlreadyAdvisoryLockedError = Class.new(StandardError)

included do
# Default column to be used when creating Advisory Locks
cattr_accessor(:advisory_lockable_column, instance_accessor: false) { primary_key }

# Default Postgres function to be used for Advisory Locks
cattr_accessor(:advisory_lockable_function) { "pg_try_advisory_lock" }

# Attempt to acquire an advisory lock on the selected records and
# return only those records for which a lock could be acquired.
# @!method advisory_lock
# @!method advisory_lock(column: advisory_lockable_column, function: advisory_lockable_function)
# @!scope class
# @param column [String, Symbol] column values to Advisory Lock against
# @param function [String, Symbol] Postgres Advisory Lock function name to use
# @return [ActiveRecord::Relation]
# A relation selecting only the records that were locked.
scope :advisory_lock, (lambda do
scope :advisory_lock, (lambda do |column: advisory_lockable_column, function: advisory_lockable_function|
original_query = self

cte_table = Arel::Table.new(:rows)
cte_query = original_query.select(primary_key).except(:limit)
cte_query = original_query.select(primary_key, column).except(:limit)
cte_type = if supports_cte_materialization_specifiers?
'MATERIALIZED'
else
Expand All @@ -41,9 +49,14 @@ module Lockable

composed_cte = Arel::Nodes::As.new(cte_table, Arel::Nodes::SqlLiteral.new([cte_type, "(", cte_query.to_sql, ")"].join(' ')))

# In addition to an advisory lock, there is also a FOR UPDATE SKIP LOCKED
# because this causes the query to skip jobs that were completed (and deleted)
# by another session in the time since the table snapshot was taken.
# In rare cases under high concurrency levels, leaving this out can result in double executions.
query = cte_table.project(cte_table[:id])
.with(composed_cte)
.where(Arel.sql(sanitize_sql_for_conditions(["pg_try_advisory_lock(('x' || substr(md5(:table_name || #{connection.quote_table_name(cte_table.name)}.#{quoted_primary_key}::text), 1, 16))::bit(64)::bigint)", { table_name: table_name }])))
.where(Arel.sql(sanitize_sql_for_conditions(["#{function}(('x' || substr(md5(:table_name || #{connection.quote_table_name(cte_table.name)}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(64)::bigint)", { table_name: table_name }])))
.lock(Arel.sql("FOR UPDATE SKIP LOCKED"))

limit = original_query.arel.ast.limit
query.limit = limit.value if limit.present?
Expand All @@ -57,40 +70,44 @@ module Lockable
#
# For details on +pg_locks+, see
# {https://www.postgresql.org/docs/current/view-pg-locks.html}.
# @!method joins_advisory_locks
# @!method joins_advisory_locks(column: advisory_lockable_column)
# @!scope class
# @param column [String, Symbol] column values to Advisory Lock against
# @return [ActiveRecord::Relation]
# @example Get the records that have a session awaiting a lock:
# MyLockableRecord.joins_advisory_locks.where("pg_locks.granted = ?", false)
scope :joins_advisory_locks, (lambda do
scope :joins_advisory_locks, (lambda do |column: advisory_lockable_column|
join_sql = <<~SQL.squish
LEFT JOIN pg_locks ON pg_locks.locktype = 'advisory'
AND pg_locks.objsubid = 1
AND pg_locks.classid = ('x' || substr(md5(:table_name || #{quoted_table_name}.#{quoted_primary_key}::text), 1, 16))::bit(32)::int
AND pg_locks.objid = (('x' || substr(md5(:table_name || #{quoted_table_name}.#{quoted_primary_key}::text), 1, 16))::bit(64) << 32)::bit(32)::int
AND pg_locks.classid = ('x' || substr(md5(:table_name || #{quoted_table_name}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(32)::int
AND pg_locks.objid = (('x' || substr(md5(:table_name || #{quoted_table_name}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(64) << 32)::bit(32)::int
SQL

joins(sanitize_sql_for_conditions([join_sql, { table_name: table_name }]))
end)

# Find records that do not have an advisory lock on them.
# @!method advisory_unlocked
# @!method advisory_unlocked(column: advisory_lockable_column)
# @!scope class
# @param column [String, Symbol] column values to Advisory Lock against
# @return [ActiveRecord::Relation]
scope :advisory_unlocked, -> { joins_advisory_locks.where(pg_locks: { locktype: nil }) }
scope :advisory_unlocked, ->(column: advisory_lockable_column) { joins_advisory_locks(column: column).where(pg_locks: { locktype: nil }) }

# Find records that have an advisory lock on them.
# @!method advisory_locked
# @!method advisory_locked(column: advisory_lockable_column)
# @!scope class
# @param column [String, Symbol] column values to Advisory Lock against
# @return [ActiveRecord::Relation]
scope :advisory_locked, -> { joins_advisory_locks.where.not(pg_locks: { locktype: nil }) }
scope :advisory_locked, ->(column: advisory_lockable_column) { joins_advisory_locks(column: column).where.not(pg_locks: { locktype: nil }) }

# Find records with advisory locks owned by the current Postgres
# session/connection.
# @!method advisory_locked
# @!method advisory_locked(column: advisory_lockable_column)
# @!scope class
# @param column [String, Symbol] column values to Advisory Lock against
# @return [ActiveRecord::Relation]
scope :owns_advisory_locked, -> { joins_advisory_locks.where('"pg_locks"."pid" = pg_backend_pid()') }
scope :owns_advisory_locked, ->(column: advisory_lockable_column) { joins_advisory_locks(column: column).where('"pg_locks"."pid" = pg_backend_pid()') }

# Whether an advisory lock should be acquired in the same transaction
# that created the record.
Expand Down Expand Up @@ -122,21 +139,26 @@ module Lockable
# can (as in {Lockable.advisory_lock}) and only pass those that could be
# locked to the block.
#
# @param column [String, Symbol] name of advisory lock or unlock function
# @param function [String, Symbol] Postgres Advisory Lock function name to use
# @yield [Array<Lockable>] the records that were successfully locked.
# @return [Object] the result of the block.
#
# @example Work on the first two +MyLockableRecord+ objects that could be locked:
# MyLockableRecord.order(created_at: :asc).limit(2).with_advisory_lock do |record|
# do_something_with record
# end
def with_advisory_lock
def with_advisory_lock(column: advisory_lockable_column, function: advisory_lockable_function)
raise ArgumentError, "Must provide a block" unless block_given?

records = advisory_lock.to_a
records = advisory_lock(column: column, function: function).to_a
begin
yield(records)
ensure
records.each(&:advisory_unlock)
records.each do |record|
key = [table_name, record[advisory_lockable_column]].join
record.advisory_unlock(key: key, function: advisory_unlockable_function(function))
end
end
end

Expand All @@ -145,49 +167,63 @@ def supports_cte_materialization_specifiers?

@_supports_cte_materialization_specifiers = connection.postgresql_version >= 120000
end

# Postgres advisory unlocking function for the class
# @param function [String, Symbol] name of advisory lock or unlock function
# @return [Boolean]
def advisory_unlockable_function(function = advisory_lockable_function)
function.to_s.sub("_lock", "_unlock").sub("_try_", "_")
end
end

# Acquires an advisory lock on this record if it is not already locked by
# another database session. Be careful to ensure you release the lock when
# you are done with {#advisory_unlock} (or {#advisory_unlock!} to release
# all remaining locks).
# @param key [String, Symbol] Key to Advisory Lock against
# @param function [String, Symbol] Postgres Advisory Lock function name to use
# @return [Boolean] whether the lock was acquired.
def advisory_lock
def advisory_lock(key: lockable_key, function: advisory_lockable_function)
query = <<~SQL.squish
SELECT 1 AS one
WHERE pg_try_advisory_lock(('x'||substr(md5($1 || $2::text), 1, 16))::bit(64)::bigint)
WHERE #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint)
SQL
binds = [[nil, self.class.table_name], [nil, send(self.class.primary_key)]]
binds = [[nil, key]]
self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Lock', binds).any?
end

# Releases an advisory lock on this record if it is locked by this database
# session. Note that advisory locks stack, so you must call
# {#advisory_unlock} and {#advisory_lock} the same number of times.
# @param key [String, Symbol] Key to lock against
# @param function [String, Symbol] Postgres Advisory Lock function name to use
# @return [Boolean] whether the lock was released.
def advisory_unlock
def advisory_unlock(key: lockable_key, function: self.class.advisory_unlockable_function(advisory_lockable_function))
query = <<~SQL.squish
SELECT 1 AS one
WHERE pg_advisory_unlock(('x'||substr(md5($1 || $2::text), 1, 16))::bit(64)::bigint)
WHERE #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint)
SQL
binds = [[nil, self.class.table_name], [nil, send(self.class.primary_key)]]
binds = [[nil, key]]
self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Unlock', binds).any?
end

# Acquires an advisory lock on this record or raises
# {RecordAlreadyAdvisoryLockedError} if it is already locked by another
# database session.
# @param key [String, Symbol] Key to lock against
# @param function [String, Symbol] Postgres Advisory Lock function name to use
# @raise [RecordAlreadyAdvisoryLockedError]
# @return [Boolean] +true+
def advisory_lock!
result = advisory_lock
def advisory_lock!(key: lockable_key, function: advisory_lockable_function)
result = advisory_lock(key: key, function: function)
result || raise(RecordAlreadyAdvisoryLockedError)
end

# Acquires an advisory lock on this record and safely releases it after the
# passed block is completed. If the record is locked by another database
# session, this raises {RecordAlreadyAdvisoryLockedError}.
#
# @param key [String, Symbol] Key to lock against
# @param function [String, Symbol] Postgres Advisory Lock function name to use
# @yield Nothing
# @return [Object] The result of the block.
#
Expand All @@ -196,51 +232,61 @@ def advisory_lock!
# record.with_advisory_lock do
# do_something_with record
# end
def with_advisory_lock
def with_advisory_lock(key: lockable_key, function: advisory_lockable_function)
raise ArgumentError, "Must provide a block" unless block_given?

advisory_lock!
advisory_lock!(key: key, function: function)
yield
ensure
advisory_unlock unless $ERROR_INFO.is_a? RecordAlreadyAdvisoryLockedError
advisory_unlock(key: key, function: self.class.advisory_unlockable_function(function)) unless $ERROR_INFO.is_a? RecordAlreadyAdvisoryLockedError
end

# Tests whether this record has an advisory lock on it.
# @param key [String, Symbol] Key to test lock against
# @return [Boolean]
def advisory_locked?
def advisory_locked?(key: lockable_key)
query = <<~SQL.squish
SELECT 1 AS one
FROM pg_locks
WHERE pg_locks.locktype = 'advisory'
AND pg_locks.objsubid = 1
AND pg_locks.classid = ('x' || substr(md5($1 || $2::text), 1, 16))::bit(32)::int
AND pg_locks.objid = (('x' || substr(md5($3 || $4::text), 1, 16))::bit(64) << 32)::bit(32)::int
AND pg_locks.classid = ('x' || substr(md5($1::text), 1, 16))::bit(32)::int
AND pg_locks.objid = (('x' || substr(md5($2::text), 1, 16))::bit(64) << 32)::bit(32)::int
SQL
binds = [[nil, self.class.table_name], [nil, send(self.class.primary_key)], [nil, self.class.table_name], [nil, send(self.class.primary_key)]]
binds = [[nil, key], [nil, key]]
self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Locked?', binds).any?
end

# Tests whether this record is locked by the current database session.
# @param key [String, Symbol] Key to test lock against
# @return [Boolean]
def owns_advisory_lock?
def owns_advisory_lock?(key: lockable_key)
query = <<~SQL.squish
SELECT 1 AS one
FROM pg_locks
WHERE pg_locks.locktype = 'advisory'
AND pg_locks.objsubid = 1
AND pg_locks.classid = ('x' || substr(md5($1 || $2::text), 1, 16))::bit(32)::int
AND pg_locks.objid = (('x' || substr(md5($3 || $4::text), 1, 16))::bit(64) << 32)::bit(32)::int
AND pg_locks.classid = ('x' || substr(md5($1::text), 1, 16))::bit(32)::int
AND pg_locks.objid = (('x' || substr(md5($2::text), 1, 16))::bit(64) << 32)::bit(32)::int
AND pg_locks.pid = pg_backend_pid()
SQL
binds = [[nil, self.class.table_name], [nil, send(self.class.primary_key)], [nil, self.class.table_name], [nil, send(self.class.primary_key)]]
binds = [[nil, key], [nil, key]]
self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Owns Advisory Lock?', binds).any?
end

# Releases all advisory locks on the record that are held by the current
# database session.
# @param key [String, Symbol] Key to lock against
# @param function [String, Symbol] Postgres Advisory Lock function name to use
# @return [void]
def advisory_unlock!
advisory_unlock while advisory_locked?
def advisory_unlock!(key: lockable_key, function: self.class.advisory_unlockable_function(advisory_lockable_function))
advisory_unlock(key: key, function: function) while advisory_locked?
end

# Default Advisory Lock key
# @return [String]
def lockable_key
[self.class.table_name, self[self.class.advisory_lockable_column]].join
end

private
Expand Down
44 changes: 43 additions & 1 deletion spec/lib/good_job/lockable_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

RSpec.describe GoodJob::Lockable do
let(:model_class) { GoodJob::Job }
let(:job) { model_class.create! }
let(:job) { model_class.create(queue_name: "default") }

describe '.advisory_lock' do
around do |example|
Expand Down Expand Up @@ -35,6 +35,30 @@
FROM "rows"
WHERE pg_try_advisory_lock(('x' || substr(md5('good_jobs' || "rows"."id"::text), 1, 16))::bit(64)::bigint)
LIMIT 2
FOR UPDATE SKIP LOCKED
)
ORDER BY "good_jobs"."priority" DESC
SQL
end

it 'can be customized with `lockable_column`' do
allow(model_class).to receive(:advisory_lockable_column).and_return("queue_name")
query = model_class.order(priority: :desc).limit(2).advisory_lock

expect(normalize_sql(query.to_sql)).to eq normalize_sql(<<~SQL.squish)
SELECT "good_jobs".*
FROM "good_jobs"
WHERE "good_jobs"."id" IN (
WITH "rows" AS #{'MATERIALIZED' if model_class.supports_cte_materialization_specifiers?} (
SELECT "good_jobs"."id", "good_jobs"."queue_name"
FROM "good_jobs"
ORDER BY "good_jobs"."priority" DESC
)
SELECT "rows"."id"
FROM "rows"
WHERE pg_try_advisory_lock(('x' || substr(md5('good_jobs' || "rows"."queue_name"::text), 1, 16))::bit(64)::bigint)
LIMIT 2
FOR UPDATE SKIP LOCKED
)
ORDER BY "good_jobs"."priority" DESC
SQL
Expand All @@ -48,6 +72,16 @@

job.advisory_unlock
end

it 'can lock an alternative column' do
expect(job).not_to be_advisory_locked
result_job = model_class.advisory_lock(column: :queue_name).first
expect(result_job).to eq job
expect(job).to be_advisory_locked(key: "good_jobsdefault")
expect(job).not_to be_advisory_locked # on default key

job.advisory_unlock(key: "good_jobsdefault")
end
end

describe '.with_advisory_lock' do
Expand Down Expand Up @@ -81,6 +115,14 @@

job.advisory_unlock
end

it 'can lock alternative values' do
job.advisory_lock!(key: "alternative")
expect(job.advisory_locked?(key: "alternative")).to be true
expect(job.advisory_locked?).to be false

job.advisory_unlock(key: "alternative")
end
end

describe '#advisory_unlock' do
Expand Down

0 comments on commit 129691c

Please sign in to comment.