Skip to content

Commit

Permalink
refactore and add specs
Browse files Browse the repository at this point in the history
  • Loading branch information
ankithads committed Aug 29, 2024
1 parent 89fe74f commit c93e650
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 12 deletions.
49 changes: 37 additions & 12 deletions lib/que/adapters/active_record_with_lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,18 @@
module Que
module Adapters
class ActiveRecordWithLock < Que::Adapters::ActiveRecord
FindJobSecondsTotal = Prometheus::Client::Counter.new(
:que_find_job_seconds_total,
docstring: "Seconds spent finding a job",
labels: %i[queue],
)

FindJobHitTotal = Prometheus::Client::Counter.new(
:que_find_job_total,
docstring: "total number of job hit and misses when acquiring a lock",
labels: %i[queue job_hit],
)

def initialize(job_connection_pool:, lock_connection_pool:)
@job_connection_pool = job_connection_pool
@lock_connection_pool = lock_connection_pool
Expand Down Expand Up @@ -35,22 +47,20 @@ def execute(command, params = [])
def lock_job_with_lock_database(queue, cursor)
result = []
loop do
break_loop = false
Que.transaction do
result = Que.execute(:find_job_to_lock, [queue, cursor])

if result.empty?
break_loop = true
break
result = Que.transaction do
observe(nil, FindJobSecondsTotal, queue: queue) do
result = Que.execute(:find_job_to_lock, [queue, cursor])
end

return result if result.empty?

cursor = result.first["job_id"]
if pg_try_advisory_lock?(cursor)
break_loop = true
break
end
job_locked = pg_try_advisory_lock?(cursor)

observe(FindJobHitTotal, nil, { queue: queue, job_hit: job_locked })
return result if job_locked
end
break if break_loop
break if result
end

result
Expand All @@ -74,6 +84,21 @@ def unlock_job(job_id)
conn.execute("SELECT pg_advisory_unlock(#{job_id})")
end
end

def observe(metric, metric_duration, labels = {})
now = monotonic_now
yield if block_given?
ensure
metric&.increment(labels: labels)
metric_duration&.increment(
by: monotonic_now - now,
labels: labels,
)
end

def monotonic_now
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
end
end
end
8 changes: 8 additions & 0 deletions spec/active_record_with_lock_spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,11 @@ def active_record_with_lock_adapter_connection
lock_connection_pool: LockDatabaseRecord.connection_pool,
)
end

RSpec.configure do |config|
if ENV["ADAPTER"] == "ActiveRecordWithLock"
config.filter_run_including :active_record_with_lock
else
config.filter_run_excluding :active_record_with_lock
end
end
14 changes: 14 additions & 0 deletions spec/integration/integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,20 @@ def wait_for_jobs_to_be_worked(timeout: 10)
expect(User.count).to eq(3)
expect(User.all.map(&:name).sort).to eq(%w[alice bob charlie])
end

it "increments the metrics", :active_record_with_lock do
CreateUser.enqueue("alice")
CreateUser.enqueue("bob")
CreateUser.enqueue("charlie")
expect(Que::Adapters::ActiveRecordWithLock::FindJobHitTotal).to receive(:increment).
with({ :labels => { :job_hit => false, :queue => "default" } }).at_least(:once).and_call_original
expect(Que::Adapters::ActiveRecordWithLock::FindJobHitTotal).to receive(:increment).
with({ :labels => { :job_hit => true, :queue => "default" } }).
exactly(3).times.and_call_original
expect(QueJob.count).to eq(3)

with_workers(5) { wait_for_jobs_to_be_worked }
end
end

context "with jobs that exceed stop timeout" do
Expand Down

0 comments on commit c93e650

Please sign in to comment.