Skip to content

Commit

Permalink
fix formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
ankithads committed Jul 5, 2024
1 parent c291312 commit 4623fab
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 68 deletions.
116 changes: 58 additions & 58 deletions lib/que/adapters/active_record_with_lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,73 +2,73 @@

# https://github.com/que-rb/que/blob/80d6067861a41766c3adb7e29b230ce93d94c8a4/lib/que/active_job/extensions.rb
module Que
module Adapters
class ActiveRecordWithLock < Que::Adapters::ActiveRecord
attr_accessor :job_connection_pool, :lock_record
def initialize(job_connection_pool:, lock_record:)
@job_connection_pool = job_connection_pool
@lock_record = lock_record
super
end
module Adapters
class ActiveRecordWithLock < Que::Adapters::ActiveRecord
def initialize(job_connection_pool:, lock_record:)
@job_connection_pool = job_connection_pool
@lock_record = lock_record
super
end

def checkout_activerecord_adapter(&block)
@job_connection_pool.with_connection(&block)
end
def checkout_activerecord_adapter(&block)
@job_connection_pool.with_connection(&block)
end

def lock_database_connection
if Thread.current[:db_connection]
return Thread.current[:db_connection] if Thread.current[:db_connection].active?
end
# We are storing this in thread variable here to make sure
# same connection is used to acquire and release the advisory locks.
# Advisory lock will not be released if any other connection from the
# pool tries to release the lock
Thread.current[:db_connection] = @lock_record.connection
def lock_database_connection
if Thread.current[:db_connection]
return Thread.current[:db_connection] if Thread.current[:db_connection].active?
end
# We are storing this in thread variable here to make sure
# same connection is used to acquire and release the advisory locks.
# Advisory lock will not be released if any other connection from the
# pool tries to release the lock
Thread.current[:db_connection] = @lock_record.connection
end

def execute(command, params=[])
case command
when :lock_job then
queue, cursor = params
lock_job_with_lock_database(queue, cursor)
when :unlock_job then
job_id = params[0]
unlock_job(job_id)
else
super(command, params)
end
def execute(command, params = [])
case command
when :lock_job
queue, cursor = params
lock_job_with_lock_database(queue, cursor)
when :unlock_job
job_id = params[0]
unlock_job(job_id)
else
super(command, params)
end
end

def lock_job_with_lock_database(queue, cursor)
result = []
loop do
result = Que.execute(:find_job_to_lock, [queue, cursor])
break if result.empty?
cursor = result.first['job_id']
if pg_try_advisory_lock?(cursor)
break
end
end
return result
end
def lock_job_with_lock_database(queue, cursor)
result = []
loop do
result = Que.execute(:find_job_to_lock, [queue, cursor])
break if result.empty?

def cleanup!
@job_connection_pool.release_connection
@lock_record.remove_connection
cursor = result.first["job_id"]
break if pg_try_advisory_lock?(cursor)
end
result
end

def pg_try_advisory_lock?(job_id)
lock_database_connection.execute("SELECT pg_try_advisory_lock(#{job_id})").try(:first)&.fetch('pg_try_advisory_lock')
end
def cleanup!
@job_connection_pool.release_connection
@lock_record.remove_connection
end

def unlock_job(job_id)
# If for any reason the connection that is used to get this advisory lock
# is corrupted, the lock on this job_id would already be released when the
# connection holding the lock goes bad.
# Now, if a new connection tries to release the non existing lock this would just no op
# by returning false and return a warning "WARNING: you don't own a lock of type ExclusiveLock"
lock_database_connection.execute("SELECT pg_advisory_unlock(#{job_id})")
end
def pg_try_advisory_lock?(job_id)
lock_database_connection.execute(
"SELECT pg_try_advisory_lock(#{job_id})",
).try(:first)&.fetch("pg_try_advisory_lock")
end

def unlock_job(job_id)
# If for any reason the connection that is used to get this advisory lock
# is corrupted, the lock on this job_id would already be released when the
# connection holding the lock goes bad.
# Now, if a new connection tries to release the non existing lock this would just no op
# by returning false and return a warning "WARNING: you don't own a lock of type ExclusiveLock"
lock_database_connection.execute("SELECT pg_advisory_unlock(#{job_id})")
end
end
end
end
end
2 changes: 1 addition & 1 deletion lib/que/locker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def lock_job_query(queue, cursor)

def handle_expired_cursors!
@consolidated_queues.each do |queue|
queue_cursor_expires_at = @queue_expires_at.fetch(queue, monotonic_now)
queue_cursor_expires_at = @queue_expires_at.fetch(queue, monotonic_now)
reset_cursor_for!(queue) if queue_cursor_expires_at < monotonic_now
end
end
Expand Down
7 changes: 1 addition & 6 deletions spec/lib/que/locker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
let(:queue) { "default" }
let(:cursor_expiry) { 0 }



describe ".with_locked_job" do
before { allow(Que).to receive(:execute).and_call_original }

Expand Down Expand Up @@ -60,9 +58,6 @@ def expect_to_lock_with(cursor:)
end

context "with just one job to lock" do
before do
described_class.instance_variable_set(:@queue_cursors, [0])
end
let!(:job_1) { FakeJob.enqueue(1, queue: queue, priority: 1).attrs }
let(:cursor_expiry) { 60 }

Expand Down Expand Up @@ -122,7 +117,7 @@ def expect_to_lock_with(cursor:)
expect_to_lock_with(cursor: job_1[:job_id])
expect_to_work(job_2)

@epoch += (cursor_expiry) # our cursor should now expire
@epoch += cursor_expiry # our cursor should now expire
expect_to_lock_with(cursor: 0)
expect_to_work(job_3)
end
Expand Down
6 changes: 3 additions & 3 deletions spec/lib/que/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class ExceptionalJobWithCustomLogging < ExceptionalJob
FakeJob.enqueue(1)

expect(Que).
to receive(:execute).with(:lock_job, include("default", 0)).and_raise(PG::Error)
to receive(:execute).with(:lock_job, ["default", 0]).and_raise(PG::Error)
expect(subject).to eq(:postgres_error)
end
end
Expand All @@ -216,7 +216,7 @@ class ExceptionalJobWithCustomLogging < ExceptionalJob
FakeJob.enqueue(1)

expect(Que).
to receive(:execute).with(:lock_job, include("default", 0)).
to receive(:execute).with(:lock_job, ["default", 0]).
and_raise(ActiveRecord::ConnectionTimeoutError)
expect(subject).to eq(:postgres_error)
end
Expand All @@ -227,7 +227,7 @@ class ExceptionalJobWithCustomLogging < ExceptionalJob
FakeJob.enqueue(1)

expect(Que).
to receive(:execute).with(:lock_job, include("default", 0)).
to receive(:execute).with(:lock_job, ["default", 0]).
and_raise(ActiveRecord::ConnectionNotEstablished)
expect(subject).to eq(:postgres_error)
end
Expand Down

0 comments on commit 4623fab

Please sign in to comment.