From ee42d610007e60c0c0f1bfc775af20f963b044cb Mon Sep 17 00:00:00 2001 From: Ankitha Damodara Date: Tue, 27 Aug 2024 14:09:22 +0100 Subject: [PATCH] Lock the rows when taking the advisory lock --- lib/que/adapters/active_record_with_lock.rb | 47 ++++++++++++++++--- lib/que/middleware/worker_collector.rb | 1 + lib/que/sql.rb | 1 + spec/integration/integration_spec.rb | 18 ------- .../adapters/active_record_with_lock_spec.rb | 42 +++++++++++++++++ spec/spec_helper.rb | 31 ++++++++++++ 6 files changed, 116 insertions(+), 24 deletions(-) create mode 100644 spec/lib/que/adapters/active_record_with_lock_spec.rb diff --git a/lib/que/adapters/active_record_with_lock.rb b/lib/que/adapters/active_record_with_lock.rb index 2e43644..f23ad21 100644 --- a/lib/que/adapters/active_record_with_lock.rb +++ b/lib/que/adapters/active_record_with_lock.rb @@ -3,6 +3,20 @@ module Que module Adapters class ActiveRecordWithLock < Que::Adapters::ActiveRecord + METRICS = [ + FindJobSecondsTotal = Prometheus::Client::Counter.new( + :que_find_job_seconds_total, + docstring: "Seconds spent finding a job", + labels: %i[queue], + ), + + FindJobHitTotal = Prometheus::Client::Counter.new( + :que_find_job_hit_total, + docstring: "total number of job hit and misses when acquiring a lock", + labels: %i[queue job_hit], + ), + ].freeze + def initialize(job_connection_pool:, lock_connection_pool:) @job_connection_pool = job_connection_pool @lock_connection_pool = lock_connection_pool @@ -32,17 +46,23 @@ def execute(command, params = []) end end + # This method continues looping through the que_jobs table until it either + # locks a job successfully or determines that there are no jobs to process. def lock_job_with_lock_database(queue, cursor) - result = [] loop do - result = Que.execute(:find_job_to_lock, [queue, cursor]) + observe(duration_metric: FindJobSecondsTotal, labels: { queue: queue }) do + Que.transaction do + job_to_lock = Que.execute(:find_job_to_lock, [queue, cursor]) + return job_to_lock if job_to_lock.empty? - break if result.empty? + cursor = job_to_lock.first["job_id"] + job_locked = pg_try_advisory_lock?(cursor) - cursor = result.first["job_id"] - break if pg_try_advisory_lock?(cursor) + observe(count_metric: FindJobHitTotal, labels: { queue: queue, job_hit: job_locked }) + return job_to_lock if job_locked + end + end end - result end def pg_try_advisory_lock?(job_id) @@ -63,6 +83,21 @@ def unlock_job(job_id) conn.execute("SELECT pg_advisory_unlock(#{job_id})") end end + + def observe(count_metric: nil, duration_metric: nil, labels: {}) + now = monotonic_now + yield if block_given? + ensure + count_metric&.increment(labels: labels) + duration_metric&.increment( + by: monotonic_now - now, + labels: labels, + ) + end + + def monotonic_now + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end end end end diff --git a/lib/que/middleware/worker_collector.rb b/lib/que/middleware/worker_collector.rb index 9d498b9..c90fb70 100644 --- a/lib/que/middleware/worker_collector.rb +++ b/lib/que/middleware/worker_collector.rb @@ -18,6 +18,7 @@ def initialize(app, options = {}) register(*WorkerGroup::METRICS) register(*Worker::METRICS) register(*Locker::METRICS) + register(*Adapters::ActiveRecordWithLock::METRICS) end def call(env) diff --git a/lib/que/sql.rb b/lib/que/sql.rb index 20496d5..0de73b6 100644 --- a/lib/que/sql.rb +++ b/lib/que/sql.rb @@ -184,6 +184,7 @@ module Que AND retryable = true AND job_id >= $2 ORDER BY priority, run_at, job_id + FOR UPDATE SKIP LOCKED LIMIT 1 }, } diff --git a/spec/integration/integration_spec.rb b/spec/integration/integration_spec.rb index ffa67a8..f1106d4 100644 --- a/spec/integration/integration_spec.rb +++ b/spec/integration/integration_spec.rb @@ -5,24 +5,6 @@ # rubocop:disable RSpec/DescribeClass RSpec.describe "multiple workers" do - def with_workers(num, stop_timeout: 5, secondary_queues: [], &block) - Que::WorkerGroup.start( - num, - wake_interval: 0.01, - secondary_queues: secondary_queues, - ).tap(&block).stop(stop_timeout) - end - - # Wait for a maximum of [timeout] seconds for all jobs to be worked - def wait_for_jobs_to_be_worked(timeout: 10) - start = Time.now - loop do - break if QueJob.count == 0 || Time.now - start > timeout - - sleep 0.1 - end - end - context "with one worker and many jobs" do it "works each job exactly once" do 10.times.each { |i| FakeJob.enqueue(i) } diff --git a/spec/lib/que/adapters/active_record_with_lock_spec.rb b/spec/lib/que/adapters/active_record_with_lock_spec.rb new file mode 100644 index 0000000..cb37b5c --- /dev/null +++ b/spec/lib/que/adapters/active_record_with_lock_spec.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe Que::Adapters::ActiveRecordWithLock, :active_record_with_lock do + subject(:adapter) do + described_class.new(job_connection_pool: JobRecord.connection_pool, + lock_connection_pool: LockDatabaseRecord.connection_pool) + end + + before do + described_class::FindJobHitTotal.values.each { |labels, _| labels.clear } + end + + context "with enqueued jobs" do + before do + 10.times do + FakeJob.enqueue(1) + end + end + + it "sets correct metric values" do + expect(QueJob.count).to eq(10) + with_workers(5) { wait_for_jobs_to_be_worked } + expect(QueJob.count).to eq(0) + expect(described_class::FindJobHitTotal.values[{ :queue => "default", :job_hit => "true" }]).to eq(10.0) + end + end + + describe ".lock_job_with_lock_database" do + subject(:lock_job) { adapter.lock_job_with_lock_database("default", 0) } + + context "with no jobs enqueued" do + it "exists the loop and sets correct metric values" do + expect(QueJob.count).to eq(0) + locked_job = lock_job + expect(locked_job).to eq([]) + expect(described_class::FindJobHitTotal.values[{ :queue => "default", :job_hit => "true" }]).to eq(0.0) + end + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 1a09615..8baa994 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -44,6 +44,19 @@ def establish_database_connection Que.logger = Logger.new("/dev/null") RSpec.configure do |config| + # Run only specific adapter files based on the adapter class + spec_dir = "./spec/lib" + # Construct the path for the adapter spec file + adapter_spec_class_path = File.join(spec_dir, "#{Que.adapter.class.to_s.underscore}_spec.rb") + + # Exclude patterns for tests in the que/adapters directory + config.exclude_pattern = "**/que/adapters/*.rb" + + # Require the adapter spec file if it exists + if File.exist?(adapter_spec_class_path) + require adapter_spec_class_path + end + config.before do QueJob.delete_all FakeJob.log = [] @@ -57,3 +70,21 @@ def establish_database_connection Prometheus::Client.registry.instance_eval { @metrics.clear } end end + +def with_workers(num, stop_timeout: 5, secondary_queues: [], &block) + Que::WorkerGroup.start( + num, + wake_interval: 0.01, + secondary_queues: secondary_queues, + ).tap(&block).stop(stop_timeout) +end + +# Wait for a maximum of [timeout] seconds for all jobs to be worked +def wait_for_jobs_to_be_worked(timeout: 10) + start = Time.now + loop do + break if QueJob.count == 0 || Time.now - start > timeout + + sleep 0.1 + end +end