Skip to content

Commit

Permalink
refactor and address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ankithads committed Sep 3, 2024
1 parent d3dbbfd commit e1a2c92
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 58 deletions.
31 changes: 16 additions & 15 deletions lib/que/adapters/active_record_with_lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
module Que
module Adapters
class ActiveRecordWithLock < Que::Adapters::ActiveRecord
class NoLockableJobs < StandardError; end
FindJobSecondsTotal = Prometheus::Client::Counter.new(
:que_find_job_seconds_total,
docstring: "Seconds spent finding a job",
Expand Down Expand Up @@ -45,25 +46,25 @@ def execute(command, params = [])
end

def lock_job_with_lock_database(queue, cursor)
result = []
job_to_lock = []
loop do
result = Que.transaction do
observe(nil, FindJobSecondsTotal, queue: queue) do
result = Que.execute(:find_job_to_lock, [queue, cursor])
end
observe(duration_metric: FindJobSecondsTotal, labels: { queue: queue }) do
locked_job = Que.transaction do
job_to_lock = Que.execute(:find_job_to_lock, [queue, cursor])

return result if result.empty?
return job_to_lock if job_to_lock.empty?

cursor = result.first["job_id"]
job_locked = pg_try_advisory_lock?(cursor)
cursor = job_to_lock.first["job_id"]
job_locked = pg_try_advisory_lock?(cursor)

observe(FindJobHitTotal, nil, { queue: queue, job_hit: job_locked })
return result if job_locked
observe(count_metric: FindJobHitTotal, labels: { queue: queue, job_hit: job_locked })
return job_to_lock if job_locked
end
return locked_job if locked_job
end
break if result
end

result
locked_job
end

def pg_try_advisory_lock?(job_id)
Expand All @@ -85,12 +86,12 @@ def unlock_job(job_id)
end
end

def observe(metric, metric_duration, labels = {})
def observe(count_metric: nil, duration_metric: nil, labels: {})
now = monotonic_now
yield if block_given?
ensure
metric&.increment(labels: labels)
metric_duration&.increment(
count_metric&.increment(labels: labels)
duration_metric&.increment(
by: monotonic_now - now,
labels: labels,
)
Expand Down
5 changes: 3 additions & 2 deletions lib/que/sql.rb
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,9 @@ module Que
AND run_at <= now()
AND retryable = true
AND job_id >= $2
ORDER BY priority, run_at, job_id
for update skip locked LIMIT 1
ORDER BY priority, run_at, job_id
FOR UPDATE SKIP LOCKED
LIMIT 1
},
}
# rubocop:enable Style/MutableConstant
Expand Down
10 changes: 1 addition & 9 deletions spec/active_record_with_lock_spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ class LockDatabaseRecord < ActiveRecord::Base
establish_connection(
adapter: "postgresql",
host: ENV.fetch("LOCK_PGHOST", "localhost"),
user: ENV.fetch("LOCK_PGUSER", "ubuntu"),
user: ENV.fetch("LOCK_PGUSER", "postgres"),
password: ENV.fetch("LOCK_PGPASSWORD", "password"),
database: ENV.fetch("LOCK_PGDATABASE", "lock-test"),
port: ENV.fetch("LOCK_PGPORT", 5434),
Expand All @@ -28,11 +28,3 @@ def active_record_with_lock_adapter_connection
lock_connection_pool: LockDatabaseRecord.connection_pool,
)
end

RSpec.configure do |config|
if ENV["ADAPTER"] == "ActiveRecordWithLock"
config.filter_run_including :active_record_with_lock
else
config.filter_run_excluding :active_record_with_lock
end
end
33 changes: 1 addition & 32 deletions spec/integration/integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,7 @@

# rubocop:disable RSpec/DescribeClass
RSpec.describe "multiple workers" do
def with_workers(num, stop_timeout: 5, secondary_queues: [], &block)
Que::WorkerGroup.start(
num,
wake_interval: 0.01,
secondary_queues: secondary_queues,
).tap(&block).stop(stop_timeout)
end

# Wait for a maximum of [timeout] seconds for all jobs to be worked
def wait_for_jobs_to_be_worked(timeout: 10)
start = Time.now
loop do
break if QueJob.count == 0 || Time.now - start > timeout

sleep 0.1
end
end


context "with one worker and many jobs" do
it "works each job exactly once" do
10.times.each { |i| FakeJob.enqueue(i) }
Expand Down Expand Up @@ -119,20 +102,6 @@ def wait_for_jobs_to_be_worked(timeout: 10)
expect(User.count).to eq(3)
expect(User.all.map(&:name).sort).to eq(%w[alice bob charlie])
end

it "increments the metrics", :active_record_with_lock do
CreateUser.enqueue("alice")
CreateUser.enqueue("bob")
CreateUser.enqueue("charlie")
expect(Que::Adapters::ActiveRecordWithLock::FindJobHitTotal).to receive(:increment).
with({ :labels => { :job_hit => false, :queue => "default" } }).at_least(:once).and_call_original
expect(Que::Adapters::ActiveRecordWithLock::FindJobHitTotal).to receive(:increment).
with({ :labels => { :job_hit => true, :queue => "default" } }).
exactly(3).times.and_call_original
expect(QueJob.count).to eq(3)

with_workers(5) { wait_for_jobs_to_be_worked }
end
end

context "with jobs that exceed stop timeout" do
Expand Down
37 changes: 37 additions & 0 deletions spec/lib/que/adapters/active_record_with_lock_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# frozen_string_literal: true

require "spec_helper"

RSpec.describe Que::Adapters::ActiveRecordWithLock do
subject(:adapter) do
described_class.new( job_connection_pool: JobRecord.connection_pool,
lock_connection_pool: LockDatabaseRecord.connection_pool
)
end
around do |example|
Que.adapter.tap do |old_adapter|
# We need this to avoid errors related to prepared statements
if old_adapter.class != described_class
Que.adapter = adapter
example.run
Que.adapter = old_adapter
else
example.run
end
end
end

before do
described_class::FindJobHitTotal.values.each { |labels, _| labels.clear }
10.times do
FakeJob.enqueue(1)
end
end

describe ".lock_job_with_lock_database" do
it "sets metric values from queue" do
with_workers(5) { wait_for_jobs_to_be_worked }
expect(described_class::FindJobHitTotal.values[{:queue=>"default", :job_hit=>"true"}]).to eql(10.0)
end
end
end
18 changes: 18 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,21 @@ def establish_database_connection
Prometheus::Client.registry.instance_eval { @metrics.clear }
end
end

def with_workers(num, stop_timeout: 5, secondary_queues: [], &block)
Que::WorkerGroup.start(
num,
wake_interval: 0.01,
secondary_queues: secondary_queues,
).tap(&block).stop(stop_timeout)
end

# Wait for a maximum of [timeout] seconds for all jobs to be worked
def wait_for_jobs_to_be_worked(timeout: 10)
start = Time.now
loop do
break if QueJob.count == 0 || Time.now - start > timeout

sleep 0.1
end
end

0 comments on commit e1a2c92

Please sign in to comment.