From 129691c9a01d827e3c2889bb6a73568b0c2b854c Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Thu, 24 Jun 2021 10:06:21 -0700 Subject: [PATCH] Allow Lockable to be passed custom column, key, and Postgres advisory lock/unlock function (#273) --- lib/good_job/lockable.rb | 124 ++++++++++++++++++++--------- spec/lib/good_job/lockable_spec.rb | 44 +++++++++- 2 files changed, 128 insertions(+), 40 deletions(-) diff --git a/lib/good_job/lockable.rb b/lib/good_job/lockable.rb index b81670a5f..561c4e456 100644 --- a/lib/good_job/lockable.rb +++ b/lib/good_job/lockable.rb @@ -22,17 +22,25 @@ module Lockable RecordAlreadyAdvisoryLockedError = Class.new(StandardError) included do + # Default column to be used when creating Advisory Locks + cattr_accessor(:advisory_lockable_column, instance_accessor: false) { primary_key } + + # Default Postgres function to be used for Advisory Locks + cattr_accessor(:advisory_lockable_function) { "pg_try_advisory_lock" } + # Attempt to acquire an advisory lock on the selected records and # return only those records for which a lock could be acquired. - # @!method advisory_lock + # @!method advisory_lock(column: advisory_lockable_column, function: advisory_lockable_function) # @!scope class + # @param column [String, Symbol] column values to Advisory Lock against + # @param function [String, Symbol] Postgres Advisory Lock function name to use # @return [ActiveRecord::Relation] # A relation selecting only the records that were locked. - scope :advisory_lock, (lambda do + scope :advisory_lock, (lambda do |column: advisory_lockable_column, function: advisory_lockable_function| original_query = self cte_table = Arel::Table.new(:rows) - cte_query = original_query.select(primary_key).except(:limit) + cte_query = original_query.select(primary_key, column).except(:limit) cte_type = if supports_cte_materialization_specifiers? 'MATERIALIZED' else @@ -41,9 +49,14 @@ module Lockable composed_cte = Arel::Nodes::As.new(cte_table, Arel::Nodes::SqlLiteral.new([cte_type, "(", cte_query.to_sql, ")"].join(' '))) + # In addition to an advisory lock, there is also a FOR UPDATE SKIP LOCKED + # because this causes the query to skip jobs that were completed (and deleted) + # by another session in the time since the table snapshot was taken. + # In rare cases under high concurrency levels, leaving this out can result in double executions. query = cte_table.project(cte_table[:id]) .with(composed_cte) - .where(Arel.sql(sanitize_sql_for_conditions(["pg_try_advisory_lock(('x' || substr(md5(:table_name || #{connection.quote_table_name(cte_table.name)}.#{quoted_primary_key}::text), 1, 16))::bit(64)::bigint)", { table_name: table_name }]))) + .where(Arel.sql(sanitize_sql_for_conditions(["#{function}(('x' || substr(md5(:table_name || #{connection.quote_table_name(cte_table.name)}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(64)::bigint)", { table_name: table_name }]))) + .lock(Arel.sql("FOR UPDATE SKIP LOCKED")) limit = original_query.arel.ast.limit query.limit = limit.value if limit.present? @@ -57,40 +70,44 @@ module Lockable # # For details on +pg_locks+, see # {https://www.postgresql.org/docs/current/view-pg-locks.html}. - # @!method joins_advisory_locks + # @!method joins_advisory_locks(column: advisory_lockable_column) # @!scope class + # @param column [String, Symbol] column values to Advisory Lock against # @return [ActiveRecord::Relation] # @example Get the records that have a session awaiting a lock: # MyLockableRecord.joins_advisory_locks.where("pg_locks.granted = ?", false) - scope :joins_advisory_locks, (lambda do + scope :joins_advisory_locks, (lambda do |column: advisory_lockable_column| join_sql = <<~SQL.squish LEFT JOIN pg_locks ON pg_locks.locktype = 'advisory' AND pg_locks.objsubid = 1 - AND pg_locks.classid = ('x' || substr(md5(:table_name || #{quoted_table_name}.#{quoted_primary_key}::text), 1, 16))::bit(32)::int - AND pg_locks.objid = (('x' || substr(md5(:table_name || #{quoted_table_name}.#{quoted_primary_key}::text), 1, 16))::bit(64) << 32)::bit(32)::int + AND pg_locks.classid = ('x' || substr(md5(:table_name || #{quoted_table_name}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(32)::int + AND pg_locks.objid = (('x' || substr(md5(:table_name || #{quoted_table_name}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(64) << 32)::bit(32)::int SQL joins(sanitize_sql_for_conditions([join_sql, { table_name: table_name }])) end) # Find records that do not have an advisory lock on them. - # @!method advisory_unlocked + # @!method advisory_unlocked(column: advisory_lockable_column) # @!scope class + # @param column [String, Symbol] column values to Advisory Lock against # @return [ActiveRecord::Relation] - scope :advisory_unlocked, -> { joins_advisory_locks.where(pg_locks: { locktype: nil }) } + scope :advisory_unlocked, ->(column: advisory_lockable_column) { joins_advisory_locks(column: column).where(pg_locks: { locktype: nil }) } # Find records that have an advisory lock on them. - # @!method advisory_locked + # @!method advisory_locked(column: advisory_lockable_column) # @!scope class + # @param column [String, Symbol] column values to Advisory Lock against # @return [ActiveRecord::Relation] - scope :advisory_locked, -> { joins_advisory_locks.where.not(pg_locks: { locktype: nil }) } + scope :advisory_locked, ->(column: advisory_lockable_column) { joins_advisory_locks(column: column).where.not(pg_locks: { locktype: nil }) } # Find records with advisory locks owned by the current Postgres # session/connection. - # @!method advisory_locked + # @!method advisory_locked(column: advisory_lockable_column) # @!scope class + # @param column [String, Symbol] column values to Advisory Lock against # @return [ActiveRecord::Relation] - scope :owns_advisory_locked, -> { joins_advisory_locks.where('"pg_locks"."pid" = pg_backend_pid()') } + scope :owns_advisory_locked, ->(column: advisory_lockable_column) { joins_advisory_locks(column: column).where('"pg_locks"."pid" = pg_backend_pid()') } # Whether an advisory lock should be acquired in the same transaction # that created the record. @@ -122,6 +139,8 @@ module Lockable # can (as in {Lockable.advisory_lock}) and only pass those that could be # locked to the block. # + # @param column [String, Symbol] name of advisory lock or unlock function + # @param function [String, Symbol] Postgres Advisory Lock function name to use # @yield [Array] the records that were successfully locked. # @return [Object] the result of the block. # @@ -129,14 +148,17 @@ module Lockable # MyLockableRecord.order(created_at: :asc).limit(2).with_advisory_lock do |record| # do_something_with record # end - def with_advisory_lock + def with_advisory_lock(column: advisory_lockable_column, function: advisory_lockable_function) raise ArgumentError, "Must provide a block" unless block_given? - records = advisory_lock.to_a + records = advisory_lock(column: column, function: function).to_a begin yield(records) ensure - records.each(&:advisory_unlock) + records.each do |record| + key = [table_name, record[advisory_lockable_column]].join + record.advisory_unlock(key: key, function: advisory_unlockable_function(function)) + end end end @@ -145,49 +167,63 @@ def supports_cte_materialization_specifiers? @_supports_cte_materialization_specifiers = connection.postgresql_version >= 120000 end + + # Postgres advisory unlocking function for the class + # @param function [String, Symbol] name of advisory lock or unlock function + # @return [Boolean] + def advisory_unlockable_function(function = advisory_lockable_function) + function.to_s.sub("_lock", "_unlock").sub("_try_", "_") + end end # Acquires an advisory lock on this record if it is not already locked by # another database session. Be careful to ensure you release the lock when # you are done with {#advisory_unlock} (or {#advisory_unlock!} to release # all remaining locks). + # @param key [String, Symbol] Key to Advisory Lock against + # @param function [String, Symbol] Postgres Advisory Lock function name to use # @return [Boolean] whether the lock was acquired. - def advisory_lock + def advisory_lock(key: lockable_key, function: advisory_lockable_function) query = <<~SQL.squish SELECT 1 AS one - WHERE pg_try_advisory_lock(('x'||substr(md5($1 || $2::text), 1, 16))::bit(64)::bigint) + WHERE #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) SQL - binds = [[nil, self.class.table_name], [nil, send(self.class.primary_key)]] + binds = [[nil, key]] self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Lock', binds).any? end # Releases an advisory lock on this record if it is locked by this database # session. Note that advisory locks stack, so you must call # {#advisory_unlock} and {#advisory_lock} the same number of times. + # @param key [String, Symbol] Key to lock against + # @param function [String, Symbol] Postgres Advisory Lock function name to use # @return [Boolean] whether the lock was released. - def advisory_unlock + def advisory_unlock(key: lockable_key, function: self.class.advisory_unlockable_function(advisory_lockable_function)) query = <<~SQL.squish SELECT 1 AS one - WHERE pg_advisory_unlock(('x'||substr(md5($1 || $2::text), 1, 16))::bit(64)::bigint) + WHERE #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) SQL - binds = [[nil, self.class.table_name], [nil, send(self.class.primary_key)]] + binds = [[nil, key]] self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Unlock', binds).any? end # Acquires an advisory lock on this record or raises # {RecordAlreadyAdvisoryLockedError} if it is already locked by another # database session. + # @param key [String, Symbol] Key to lock against + # @param function [String, Symbol] Postgres Advisory Lock function name to use # @raise [RecordAlreadyAdvisoryLockedError] # @return [Boolean] +true+ - def advisory_lock! - result = advisory_lock + def advisory_lock!(key: lockable_key, function: advisory_lockable_function) + result = advisory_lock(key: key, function: function) result || raise(RecordAlreadyAdvisoryLockedError) end # Acquires an advisory lock on this record and safely releases it after the # passed block is completed. If the record is locked by another database # session, this raises {RecordAlreadyAdvisoryLockedError}. - # + # @param key [String, Symbol] Key to lock against + # @param function [String, Symbol] Postgres Advisory Lock function name to use # @yield Nothing # @return [Object] The result of the block. # @@ -196,51 +232,61 @@ def advisory_lock! # record.with_advisory_lock do # do_something_with record # end - def with_advisory_lock + def with_advisory_lock(key: lockable_key, function: advisory_lockable_function) raise ArgumentError, "Must provide a block" unless block_given? - advisory_lock! + advisory_lock!(key: key, function: function) yield ensure - advisory_unlock unless $ERROR_INFO.is_a? RecordAlreadyAdvisoryLockedError + advisory_unlock(key: key, function: self.class.advisory_unlockable_function(function)) unless $ERROR_INFO.is_a? RecordAlreadyAdvisoryLockedError end # Tests whether this record has an advisory lock on it. + # @param key [String, Symbol] Key to test lock against # @return [Boolean] - def advisory_locked? + def advisory_locked?(key: lockable_key) query = <<~SQL.squish SELECT 1 AS one FROM pg_locks WHERE pg_locks.locktype = 'advisory' AND pg_locks.objsubid = 1 - AND pg_locks.classid = ('x' || substr(md5($1 || $2::text), 1, 16))::bit(32)::int - AND pg_locks.objid = (('x' || substr(md5($3 || $4::text), 1, 16))::bit(64) << 32)::bit(32)::int + AND pg_locks.classid = ('x' || substr(md5($1::text), 1, 16))::bit(32)::int + AND pg_locks.objid = (('x' || substr(md5($2::text), 1, 16))::bit(64) << 32)::bit(32)::int SQL - binds = [[nil, self.class.table_name], [nil, send(self.class.primary_key)], [nil, self.class.table_name], [nil, send(self.class.primary_key)]] + binds = [[nil, key], [nil, key]] self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Locked?', binds).any? end # Tests whether this record is locked by the current database session. + # @param key [String, Symbol] Key to test lock against # @return [Boolean] - def owns_advisory_lock? + def owns_advisory_lock?(key: lockable_key) query = <<~SQL.squish SELECT 1 AS one FROM pg_locks WHERE pg_locks.locktype = 'advisory' AND pg_locks.objsubid = 1 - AND pg_locks.classid = ('x' || substr(md5($1 || $2::text), 1, 16))::bit(32)::int - AND pg_locks.objid = (('x' || substr(md5($3 || $4::text), 1, 16))::bit(64) << 32)::bit(32)::int + AND pg_locks.classid = ('x' || substr(md5($1::text), 1, 16))::bit(32)::int + AND pg_locks.objid = (('x' || substr(md5($2::text), 1, 16))::bit(64) << 32)::bit(32)::int AND pg_locks.pid = pg_backend_pid() SQL - binds = [[nil, self.class.table_name], [nil, send(self.class.primary_key)], [nil, self.class.table_name], [nil, send(self.class.primary_key)]] + binds = [[nil, key], [nil, key]] self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Owns Advisory Lock?', binds).any? end # Releases all advisory locks on the record that are held by the current # database session. + # @param key [String, Symbol] Key to lock against + # @param function [String, Symbol] Postgres Advisory Lock function name to use # @return [void] - def advisory_unlock! - advisory_unlock while advisory_locked? + def advisory_unlock!(key: lockable_key, function: self.class.advisory_unlockable_function(advisory_lockable_function)) + advisory_unlock(key: key, function: function) while advisory_locked? + end + + # Default Advisory Lock key + # @return [String] + def lockable_key + [self.class.table_name, self[self.class.advisory_lockable_column]].join end private diff --git a/spec/lib/good_job/lockable_spec.rb b/spec/lib/good_job/lockable_spec.rb index 78ac9492d..7d0a67eb5 100644 --- a/spec/lib/good_job/lockable_spec.rb +++ b/spec/lib/good_job/lockable_spec.rb @@ -2,7 +2,7 @@ RSpec.describe GoodJob::Lockable do let(:model_class) { GoodJob::Job } - let(:job) { model_class.create! } + let(:job) { model_class.create(queue_name: "default") } describe '.advisory_lock' do around do |example| @@ -35,6 +35,30 @@ FROM "rows" WHERE pg_try_advisory_lock(('x' || substr(md5('good_jobs' || "rows"."id"::text), 1, 16))::bit(64)::bigint) LIMIT 2 + FOR UPDATE SKIP LOCKED + ) + ORDER BY "good_jobs"."priority" DESC + SQL + end + + it 'can be customized with `lockable_column`' do + allow(model_class).to receive(:advisory_lockable_column).and_return("queue_name") + query = model_class.order(priority: :desc).limit(2).advisory_lock + + expect(normalize_sql(query.to_sql)).to eq normalize_sql(<<~SQL.squish) + SELECT "good_jobs".* + FROM "good_jobs" + WHERE "good_jobs"."id" IN ( + WITH "rows" AS #{'MATERIALIZED' if model_class.supports_cte_materialization_specifiers?} ( + SELECT "good_jobs"."id", "good_jobs"."queue_name" + FROM "good_jobs" + ORDER BY "good_jobs"."priority" DESC + ) + SELECT "rows"."id" + FROM "rows" + WHERE pg_try_advisory_lock(('x' || substr(md5('good_jobs' || "rows"."queue_name"::text), 1, 16))::bit(64)::bigint) + LIMIT 2 + FOR UPDATE SKIP LOCKED ) ORDER BY "good_jobs"."priority" DESC SQL @@ -48,6 +72,16 @@ job.advisory_unlock end + + it 'can lock an alternative column' do + expect(job).not_to be_advisory_locked + result_job = model_class.advisory_lock(column: :queue_name).first + expect(result_job).to eq job + expect(job).to be_advisory_locked(key: "good_jobsdefault") + expect(job).not_to be_advisory_locked # on default key + + job.advisory_unlock(key: "good_jobsdefault") + end end describe '.with_advisory_lock' do @@ -81,6 +115,14 @@ job.advisory_unlock end + + it 'can lock alternative values' do + job.advisory_lock!(key: "alternative") + expect(job.advisory_locked?(key: "alternative")).to be true + expect(job.advisory_locked?).to be false + + job.advisory_unlock(key: "alternative") + end end describe '#advisory_unlock' do