Skip to content

Commit

Permalink
Lock the rows when taking the advisory lock
Browse files Browse the repository at this point in the history
  • Loading branch information
ankithads committed Aug 27, 2024
1 parent 99061e9 commit 89fe74f
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 6 deletions.
19 changes: 15 additions & 4 deletions lib/que/adapters/active_record_with_lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,24 @@ def execute(command, params = [])
def lock_job_with_lock_database(queue, cursor)
result = []
loop do
result = Que.execute(:find_job_to_lock, [queue, cursor])
break_loop = false
Que.transaction do
result = Que.execute(:find_job_to_lock, [queue, cursor])

break if result.empty?
if result.empty?
break_loop = true
break
end

cursor = result.first["job_id"]
break if pg_try_advisory_lock?(cursor)
cursor = result.first["job_id"]
if pg_try_advisory_lock?(cursor)
break_loop = true
break
end
end
break if break_loop
end

result
end

Expand Down
2 changes: 1 addition & 1 deletion lib/que/sql.rb
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ module Que
AND retryable = true
AND job_id >= $2
ORDER BY priority, run_at, job_id
LIMIT 1
for update skip locked LIMIT 1
},
}
# rubocop:enable Style/MutableConstant
Expand Down
2 changes: 1 addition & 1 deletion spec/active_record_with_lock_spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ class LockDatabaseRecord < ActiveRecord::Base
establish_connection(
adapter: "postgresql",
host: ENV.fetch("LOCK_PGHOST", "localhost"),
user: ENV.fetch("LOCK_PGUSER", "postgres"),
user: ENV.fetch("LOCK_PGUSER", "ubuntu"),
password: ENV.fetch("LOCK_PGPASSWORD", "password"),
database: ENV.fetch("LOCK_PGDATABASE", "lock-test"),
port: ENV.fetch("LOCK_PGPORT", 5434),
Expand Down

0 comments on commit 89fe74f

Please sign in to comment.